Author: chirino
Date: Mon Nov 24 09:44:12 2008
New Revision: 720234

URL: http://svn.apache.org/viewvc?rev=720234&view=rev
Log:
- Switched to using camel case in the xml emlement names to make things 
consistent.
- Replaced the asyncReplication property in the ReplicationService with a 
minimumReplicas properties.  When set to 0, async replication will be in effect.
- Also removed the use of a map to track replication requests since at most, 
only 1 sync requrest is issued at a time.


Modified:
    
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
    
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
    
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
    
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
    activemq/trunk/kahadb/src/test/resources/broker1/ha.xml
    activemq/trunk/kahadb/src/test/resources/broker2/ha.xml

Modified: 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
--- 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
 (original)
+++ 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
 Mon Nov 24 09:44:12 2008
@@ -26,7 +26,7 @@
  * he will create the actual BrokerService
  * 
  * @author chirino
- * @org.apache.xbean.XBean element="kahadb-replication-broker"
+ * @org.apache.xbean.XBean element="kahadbReplicationBroker"
  */
 public class ReplicationBrokerService extends BrokerService {
 

Modified: 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
--- 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
 (original)
+++ 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
 Mon Nov 24 09:44:12 2008
@@ -23,9 +23,10 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -51,58 +52,61 @@
 import org.apache.kahadb.store.KahaDBStore;
 import org.apache.kahadb.util.ByteSequence;
 
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
 
 public class ReplicationMaster implements Service, ClusterListener, 
ReplicationTarget {
 
-       private static final Log LOG = 
LogFactory.getLog(ReplicationService.class);
+    private static final Log LOG = LogFactory.getLog(ReplicationService.class);
 
-       private final ReplicationService replicationService;
+    private final ReplicationService replicationService;
+
+    private Object serverMutex = new Object() {
+    };
+    private TransportServer server;
+
+    private ArrayList<ReplicationSession> sessions = new 
ArrayList<ReplicationSession>();
+
+    private final AtomicInteger nextSnapshotId = new AtomicInteger();
+    private final Object requestMutex = new Object(){};
+    private Location requestLocation;
+    private CountDownLatch requestLatch;
+    private int minimumReplicas;
+    
+    public ReplicationMaster(ReplicationService replicationService) {
+        this.replicationService = replicationService;
+        minimumReplicas = replicationService.getMinimumReplicas();
+    }
+
+    public void start() throws Exception {
+        synchronized (serverMutex) {
+            server = TransportFactory.bind(new 
URI(replicationService.getUri()));
+            server.setAcceptListener(new TransportAcceptListener() {
+                public void onAccept(Transport transport) {
+                    try {
+                        synchronized (serverMutex) {
+                            ReplicationSession session = new 
ReplicationSession(transport);
+                            session.start();
+                            addSession(session);
+                        }
+                    } catch (Exception e) {
+                        LOG.info("Could not accept replication connection from 
slave at " + transport.getRemoteAddress() + ", due to: " + e, e);
+                    }
+                }
+
+                public void onAcceptError(Exception e) {
+                    LOG.info("Could not accept replication connection: " + e, 
e);
+                }
+            });
+            server.start();
+        }
+        replicationService.getStore().getJournal().setReplicationTarget(this);
+    }
 
-       private Object serverMutex = new Object() {};
-       private TransportServer server;
-       
-       private ArrayList<ReplicationSession> sessions = new 
ArrayList<ReplicationSession>();
-       
-       private final AtomicInteger nextSnapshotId = new AtomicInteger();
-    private final Map<Location, CountDownLatch> requestMap = new 
LinkedHashMap<Location, CountDownLatch>();
-
-       public ReplicationMaster(ReplicationService replicationService) {
-               this.replicationService = replicationService;
-       }
-
-       public void start() throws Exception {
-               synchronized (serverMutex) {
-                       server = TransportFactory.bind(new 
URI(replicationService.getUri()));
-                       server.setAcceptListener(new TransportAcceptListener() {
-                               public void onAccept(Transport transport) {
-                                       try {
-                                               synchronized (serverMutex) {
-                                                       ReplicationSession 
session = new ReplicationSession(transport);
-                                                       session.start();
-                                                       addSession(session);
-                                               }
-                                       } catch (Exception e) {
-                                               LOG.info("Could not accept 
replication connection from slave at " + transport.getRemoteAddress() + ", due 
to: " + e, e);
-                                       }
-                               }
-
-                               public void onAcceptError(Exception e) {
-                                       LOG.info("Could not accept replication 
connection: " + e, e);
-                               }
-                       });
-                       server.start();
-               }
-               
replicationService.getStore().getJournal().setReplicationTarget(this);
-       }
-       
     boolean isStarted() {
         synchronized (serverMutex) {
-            return server!=null;
+            return server != null;
         }
     }
-    
+
     public void stop() throws Exception {
         replicationService.getStore().getJournal().setReplicationTarget(null);
         synchronized (serverMutex) {
@@ -111,24 +115,24 @@
                 server = null;
             }
         }
-        
+
         ArrayList<ReplicationSession> sessionsSnapshot;
         synchronized (this.sessions) {
             sessionsSnapshot = this.sessions;
         }
-        
-        for (ReplicationSession session: sessionsSnapshot) {
+
+        for (ReplicationSession session : sessionsSnapshot) {
             session.stop();
         }
     }
 
-       protected void addSession(ReplicationSession session) {
-           synchronized (sessions) {
-               sessions = new ArrayList<ReplicationSession>(sessions);
-               sessions.add(session);
+    protected void addSession(ReplicationSession session) {
+        synchronized (sessions) {
+            sessions = new ArrayList<ReplicationSession>(sessions);
+            sessions.add(session);
         }
     }
-       
+
     protected void removeSession(ReplicationSession session) {
         synchronized (sessions) {
             sessions = new ArrayList<ReplicationSession>(sessions);
@@ -136,352 +140,348 @@
         }
     }
 
-       public void onClusterChange(ClusterState config) {
-               // For now, we don't really care about changes in the slave 
config..
-       }
-
-       /**
-        * This is called by the Journal so that we can replicate the update to 
the 
-        * slaves.
-        */
-       public void replicate(Location location, ByteSequence sequence, boolean 
sync) {
-           ArrayList<ReplicationSession> sessionsSnapshot;
+    public void onClusterChange(ClusterState config) {
+        // For now, we don't really care about changes in the slave config..
+    }
+
+    /**
+     * This is called by the Journal so that we can replicate the update to the
+     * slaves.
+     */
+    public void replicate(Location location, ByteSequence sequence, boolean 
sync) {
+        ArrayList<ReplicationSession> sessionsSnapshot;
         synchronized (this.sessions) {
             // Hurrah for copy on write..
             sessionsSnapshot = this.sessions;
         }
-           
 
-        // We may be configured to always do async replication..
-               if ( replicationService.isAsyncReplication() ) {
-                   sync=false;
-               }
-               CountDownLatch latch=null;
-               if( sync ) {
-               latch = new CountDownLatch(1);
-            synchronized (requestMap) {
-                requestMap.put(location, latch);
-            }
-               }
-               
-               ReplicationFrame frame=null;
-               for (ReplicationSession session : sessionsSnapshot) {
-                       if( session.subscribedToJournalUpdates.get() ) {
-                           
-                           // Lazy create the frame since we may have not 
avilable sessions to send to.
-                           if( frame == null ) {
-                       frame = new ReplicationFrame();
+        // We may be able to always async replicate...
+        if (minimumReplicas==0) {
+            sync = false;
+        }
+        CountDownLatch latch = null;
+        if (sync) {
+            latch = new CountDownLatch(minimumReplicas);
+            synchronized (requestMutex) {
+                requestLatch = latch;
+                requestLocation = location;
+            }
+        }
+
+        ReplicationFrame frame = null;
+        for (ReplicationSession session : sessionsSnapshot) {
+            if (session.subscribedToJournalUpdates.get()) {
+
+                // Lazy create the frame since we may have not avilable 
sessions
+                // to send to.
+                if (frame == null) {
+                    frame = new ReplicationFrame();
                     frame.setHeader(new 
PBHeader().setType(PBType.JOURNAL_UPDATE));
                     PBJournalUpdate payload = new PBJournalUpdate();
                     payload.setLocation(ReplicationSupport.convert(location));
                     payload.setData(new 
org.apache.activemq.protobuf.Buffer(sequence.getData(), sequence.getOffset(), 
sequence.getLength()));
                     payload.setSendAck(sync);
                     frame.setPayload(payload);
-                           }
+                }
+
+                // TODO: use async send threads so that the frames can be 
pushed
+                // out in parallel.
+                try {
+                    session.setLastUpdateLocation(location);
+                    session.transport.oneway(frame);
+                } catch (IOException e) {
+                    session.onException(e);
+                }
+            }
+        }
 
-                               // TODO: use async send threads so that the 
frames can be pushed out in parallel. 
-                               try {
-                                   session.setLastUpdateLocation(location);
-                                       session.transport.oneway(frame);
-                               } catch (IOException e) {
-                                       session.onException(e);
-                               }
-                       }
-               }
-               
         if (sync) {
             try {
                 int timeout = 500;
-                int counter=0;
-                while( true ) {
-                    if( latch.await(timeout, TimeUnit.MILLISECONDS) ) {
-                        synchronized (requestMap) {
-                            requestMap.remove(location);
-                        }
+                int counter = 0;
+                while (true) {
+                    if (latch.await(timeout, TimeUnit.MILLISECONDS)) {
                         return;
                     }
-                    if( !isStarted() ) {
+                    if (!isStarted()) {
                         return;
                     }
                     counter++;
-                    if( (counter%10)==0 ) {
-                        LOG.warn("KahaDB is waiting for slave to come online. 
"+(timeout*counter/1000.f)+" seconds have elapsed.");
+                    if ((counter % 10) == 0) {
+                        LOG.warn("KahaDB is waiting for slave to come online. 
" + (timeout * counter / 1000.f) + " seconds have elapsed.");
                     }
-                } 
+                }
             } catch (InterruptedException ignore) {
             }
         }
-               
-       }
-       
+
+    }
+
     private void ackAllFromTo(Location lastAck, Location newAck) {
-        if ( replicationService.isAsyncReplication() ) {
+        Location l;
+        java.util.concurrent.CountDownLatch latch;
+        synchronized (requestMutex) {
+            latch = requestLatch;
+            l = requestLocation;
+        }
+        if( l == null ) {
             return;
         }
         
-        ArrayList<Entry<Location, CountDownLatch>> entries;
-        synchronized (requestMap) {
-            entries = new ArrayList<Entry<Location, 
CountDownLatch>>(requestMap.entrySet());
-        }
-        boolean inRange=false;
-        for (Entry<Location, CountDownLatch> entry : entries) {
-            Location l = entry.getKey();
-            if( !inRange ) {
-                if( lastAck==null || lastAck.compareTo(l) < 0 ) {
-                    inRange=true;
-                }
-            }
-            if( inRange ) {
-                entry.getValue().countDown();
-                if( newAck!=null && l.compareTo(newAck) <= 0 ) {
-                    return;
-                }
+        if (lastAck == null || lastAck.compareTo(l) < 0) {
+            if (newAck != null && l.compareTo(newAck) <= 0) {
+                latch.countDown();
+                return;
             }
-        }
+        } 
     }
 
+    class ReplicationSession implements Service, TransportListener {
 
-       class ReplicationSession implements Service, TransportListener {
-
-               private final Transport transport;
-               private final AtomicBoolean subscribedToJournalUpdates = new 
AtomicBoolean();
+        private final Transport transport;
+        private final AtomicBoolean subscribedToJournalUpdates = new 
AtomicBoolean();
         private boolean stopped;
-               
-               private File snapshotFile;
-               private HashSet<Integer> journalReplicatedFiles;
-               private Location lastAckLocation;
+
+        private File snapshotFile;
+        private HashSet<Integer> journalReplicatedFiles;
+        private Location lastAckLocation;
         private Location lastUpdateLocation;
         private boolean online;
 
-               public ReplicationSession(Transport transport) {
-                       this.transport = transport;
-               }
+        public ReplicationSession(Transport transport) {
+            this.transport = transport;
+        }
 
-               synchronized public void setLastUpdateLocation(Location 
lastUpdateLocation) {
+        synchronized public void setLastUpdateLocation(Location 
lastUpdateLocation) {
             this.lastUpdateLocation = lastUpdateLocation;
         }
 
         public void start() throws Exception {
-                       transport.setTransportListener(this);
-                       transport.start();
-               }
+            transport.setTransportListener(this);
+            transport.start();
+        }
 
         synchronized public void stop() throws Exception {
-                   if ( !stopped  ) { 
-                       stopped=true;
-                       deleteReplicationData();
-                       transport.stop();
-                   }
-               }
+            if (!stopped) {
+                stopped = true;
+                deleteReplicationData();
+                transport.stop();
+            }
+        }
 
-               synchronized private void onJournalUpdateAck(ReplicationFrame 
frame, PBJournalLocation location) {
+        synchronized private void onJournalUpdateAck(ReplicationFrame frame, 
PBJournalLocation location) {
             Location ack = ReplicationSupport.convert(location);
-                   if( online ) {
+            if (online) {
                 ackAllFromTo(lastAckLocation, ack);
-                   }
-            lastAckLocation=ack;
-           }
-               
-               synchronized private void onSlaveOnline(ReplicationFrame frame) 
{
+            }
+            lastAckLocation = ack;
+        }
+
+        synchronized private void onSlaveOnline(ReplicationFrame frame) {
             deleteReplicationData();
-            online  = true;
-            if( lastAckLocation!=null ) {
+            online = true;
+            if (lastAckLocation != null) {
                 ackAllFromTo(null, lastAckLocation);
             }
-            
+
         }
 
         public void onCommand(Object command) {
-                       try {
-                               ReplicationFrame frame = (ReplicationFrame) 
command;
-                               switch (frame.getHeader().getType()) {
-                               case SLAVE_INIT:
-                                       onSlaveInit(frame, (PBSlaveInit) 
frame.getPayload());
-                                       break;
-                               case SLAVE_ONLINE:
-                                       onSlaveOnline(frame);
-                                       break;
-                               case FILE_TRANSFER:
-                                       onFileTransfer(frame, (PBFileInfo) 
frame.getPayload());
-                                       break;
-                               case JOURNAL_UPDATE_ACK:
-                                       onJournalUpdateAck(frame, 
(PBJournalLocation) frame.getPayload());
-                                       break;
-                               }
-                       } catch (Exception e) {
-                               LOG.warn("Slave request failed: "+e, e);
-                               failed(e);
-                       }
-               }
-
-               public void onException(IOException error) {
-                       failed(error);
-               }
-
-               public void failed(Exception error) {
-                       try {
-                               stop();
-                       } catch (Exception ignore) {
-                       }
-               }
-
-               public void transportInterupted() {
-               }
-               public void transportResumed() {
-               }
-               
-               private void deleteReplicationData() {
-                       if( snapshotFile!=null ) {
-                               snapshotFile.delete();
-                               snapshotFile=null;
-                       }
-                       if( journalReplicatedFiles!=null ) {
-                               journalReplicatedFiles=null;
-                               updateJournalReplicatedFiles();
-                       }
-               }
-
-               private void onSlaveInit(ReplicationFrame frame, PBSlaveInit 
slaveInit) throws Exception {
-
-                       // Start sending journal updates to the slave.
-                       subscribedToJournalUpdates.set(true);
-
-                       // We could look at the slave state sent in the 
slaveInit and decide
-                       // that a full sync is not needed..
-                       // but for now we will do a full sync every time.
-                       ReplicationFrame rc = new ReplicationFrame();
-                       final PBSlaveInitResponse rcPayload = new 
PBSlaveInitResponse();
-                       rc.setHeader(new 
PBHeader().setType(PBType.SLAVE_INIT_RESPONSE));
-                       rc.setPayload(rcPayload);
-                       
-                       // Setup a map of all the files that the slave has
-                       final HashMap<String, PBFileInfo> slaveFiles = new 
HashMap<String, PBFileInfo>();
-                       for (PBFileInfo info : slaveInit.getCurrentFilesList()) 
{
-                               slaveFiles.put(info.getName(), info);
-                       }
-                       
-                       
-                       final KahaDBStore store = replicationService.getStore();
-                       store.checkpoint(new Callback() {
-                               public void execute() throws Exception {
-                                       // This call back is executed once the 
checkpoint is
-                                       // completed and all data has been 
synced to disk, 
-                                       // but while a lock is still held on 
the store so 
-                                       // that no updates are done while we 
are in this
-                                       // method.
-                                       
-                                       KahaDBStore store = 
replicationService.getStore();
-                                       if( lastAckLocation==null ) {
-                                           lastAckLocation = 
store.getLastUpdatePosition();
-                                       }
-                                       
-                                       int snapshotId = 
nextSnapshotId.incrementAndGet();
-                                       File file = 
store.getPageFile().getFile();
-                                       File dir = 
replicationService.getTempReplicationDir();
-                                       dir.mkdirs();
-                                       snapshotFile = new File(dir, 
"snapshot-" + snapshotId);
-                                       
-                                       journalReplicatedFiles = new 
HashSet<Integer>();
-                                       
-                                       // Store the list files associated with 
the snapshot.
-                                       ArrayList<PBFileInfo> snapshotInfos = 
new ArrayList<PBFileInfo>();
-                                       Map<Integer, DataFile> journalFiles = 
store.getJournal().getFileMap();
-                                       for (DataFile df : 
journalFiles.values()) {
-                                               // Look at what the slave has 
so that only the missing bits are transfered.
-                                               String name = "journal-" + 
df.getDataFileId();
-                                               PBFileInfo slaveInfo = 
slaveFiles.remove(name);
-                                               
-                                               // Use the checksum info to see 
if the slave has the file already.. Checksums are less acurrate for
-                                               // small amounts of data.. so 
ignore small files.
-                                               if( slaveInfo!=null && 
slaveInfo.getEnd()> 1024*512 ) {
-                                                       // If the slave's file 
checksum matches what we have..
-                                                       if( 
ReplicationSupport.checksum(df.getFile(), 0, 
slaveInfo.getEnd())==slaveInfo.getChecksum() ) {
-                                                               // is Our file 
longer? then we need to continue transferring the rest of the file.
-                                                               if( 
df.getLength() > slaveInfo.getEnd() ) {
-                                                                       
snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), 
slaveInfo.getEnd(), df.getLength()));
-                                                                       
journalReplicatedFiles.add(df.getDataFileId());
-                                                                       
continue;
-                                                               } else {
-                                                                       // No 
need to replicate this file.
-                                                                       
continue;
-                                                               }
-                                                       } 
-                                               }
-                                               
-                                               // If we got here then it means 
we need to transfer the whole file.
-                                               
snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), 0, 
df.getLength()));
-                                               
journalReplicatedFiles.add(df.getDataFileId());
-                                       }
-
-                                       PBFileInfo info = new PBFileInfo();
-                                       info.setName("database");
-                                       info.setSnapshotId(snapshotId);
-                                       info.setStart(0);
-                                       info.setEnd(file.length());
-                                       
info.setChecksum(ReplicationSupport.copyAndChecksum(file, snapshotFile));
-                                       snapshotInfos.add(info);
-                                       
-                                       
rcPayload.setCopyFilesList(snapshotInfos);
-                                       ArrayList<String> deleteFiles = new 
ArrayList<String>();
-                                       slaveFiles.remove("database");
-                                       for (PBFileInfo unused : 
slaveFiles.values()) {
-                                               
deleteFiles.add(unused.getName());
-                                       }
-                                       
rcPayload.setDeleteFilesList(deleteFiles);
-                                       
-                                       updateJournalReplicatedFiles();
-                               }
-
-                       });
-                       
-                       transport.oneway(rc);
-               }
-               
-               private void onFileTransfer(ReplicationFrame frame, PBFileInfo 
fileInfo) throws IOException {
-                       File file = 
replicationService.getReplicationFile(fileInfo.getName());
-                       long payloadSize = 
fileInfo.getEnd()-fileInfo.getStart();
-                       
-                       if( file.length() < fileInfo.getStart()+payloadSize ) {
-                               throw new IOException("Requested replication 
file dose not have enough data.");
-                       }               
-                       
-                       ReplicationFrame rc = new ReplicationFrame();
-                       rc.setHeader(new 
PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(payloadSize));
-                       
-                       FileInputStream is = new FileInputStream(file);
-                       rc.setPayload(is);
-                       try {
-                               is.skip(fileInfo.getStart());
-                               transport.oneway(rc);
-                       } finally {
-                               try {
-                                       is.close();
-                               } catch (Throwable e) {
-                               }
-                       }
-               }
-
-       }
-
-       /**
-        * Looks at all the journal files being currently replicated and 
informs the KahaDB so that
-        * it does not delete them while the replication is occuring.
-        */
-       private void updateJournalReplicatedFiles() {
-               HashSet<Integer>  files = 
replicationService.getStore().getJournalFilesBeingReplicated();
-               files.clear();
+            try {
+                ReplicationFrame frame = (ReplicationFrame)command;
+                switch (frame.getHeader().getType()) {
+                case SLAVE_INIT:
+                    onSlaveInit(frame, (PBSlaveInit)frame.getPayload());
+                    break;
+                case SLAVE_ONLINE:
+                    onSlaveOnline(frame);
+                    break;
+                case FILE_TRANSFER:
+                    onFileTransfer(frame, (PBFileInfo)frame.getPayload());
+                    break;
+                case JOURNAL_UPDATE_ACK:
+                    onJournalUpdateAck(frame, 
(PBJournalLocation)frame.getPayload());
+                    break;
+                }
+            } catch (Exception e) {
+                LOG.warn("Slave request failed: " + e, e);
+                failed(e);
+            }
+        }
+
+        public void onException(IOException error) {
+            failed(error);
+        }
+
+        public void failed(Exception error) {
+            try {
+                stop();
+            } catch (Exception ignore) {
+            }
+        }
+
+        public void transportInterupted() {
+        }
+
+        public void transportResumed() {
+        }
+
+        private void deleteReplicationData() {
+            if (snapshotFile != null) {
+                snapshotFile.delete();
+                snapshotFile = null;
+            }
+            if (journalReplicatedFiles != null) {
+                journalReplicatedFiles = null;
+                updateJournalReplicatedFiles();
+            }
+        }
+
+        private void onSlaveInit(ReplicationFrame frame, PBSlaveInit 
slaveInit) throws Exception {
+
+            // Start sending journal updates to the slave.
+            subscribedToJournalUpdates.set(true);
+
+            // We could look at the slave state sent in the slaveInit and 
decide
+            // that a full sync is not needed..
+            // but for now we will do a full sync every time.
+            ReplicationFrame rc = new ReplicationFrame();
+            final PBSlaveInitResponse rcPayload = new PBSlaveInitResponse();
+            rc.setHeader(new PBHeader().setType(PBType.SLAVE_INIT_RESPONSE));
+            rc.setPayload(rcPayload);
+
+            // Setup a map of all the files that the slave has
+            final HashMap<String, PBFileInfo> slaveFiles = new HashMap<String, 
PBFileInfo>();
+            for (PBFileInfo info : slaveInit.getCurrentFilesList()) {
+                slaveFiles.put(info.getName(), info);
+            }
+
+            final KahaDBStore store = replicationService.getStore();
+            store.checkpoint(new Callback() {
+                public void execute() throws Exception {
+                    // This call back is executed once the checkpoint is
+                    // completed and all data has been synced to disk,
+                    // but while a lock is still held on the store so
+                    // that no updates are done while we are in this
+                    // method.
+
+                    KahaDBStore store = replicationService.getStore();
+                    if (lastAckLocation == null) {
+                        lastAckLocation = store.getLastUpdatePosition();
+                    }
+
+                    int snapshotId = nextSnapshotId.incrementAndGet();
+                    File file = store.getPageFile().getFile();
+                    File dir = replicationService.getTempReplicationDir();
+                    dir.mkdirs();
+                    snapshotFile = new File(dir, "snapshot-" + snapshotId);
+
+                    journalReplicatedFiles = new HashSet<Integer>();
+
+                    // Store the list files associated with the snapshot.
+                    ArrayList<PBFileInfo> snapshotInfos = new 
ArrayList<PBFileInfo>();
+                    Map<Integer, DataFile> journalFiles = 
store.getJournal().getFileMap();
+                    for (DataFile df : journalFiles.values()) {
+                        // Look at what the slave has so that only the missing
+                        // bits are transfered.
+                        String name = "journal-" + df.getDataFileId();
+                        PBFileInfo slaveInfo = slaveFiles.remove(name);
+
+                        // Use the checksum info to see if the slave has the
+                        // file already.. Checksums are less acurrate for
+                        // small amounts of data.. so ignore small files.
+                        if (slaveInfo != null && slaveInfo.getEnd() > 1024 * 
512) {
+                            // If the slave's file checksum matches what we
+                            // have..
+                            if (ReplicationSupport.checksum(df.getFile(), 0, 
slaveInfo.getEnd()) == slaveInfo.getChecksum()) {
+                                // is Our file longer? then we need to continue
+                                // transferring the rest of the file.
+                                if (df.getLength() > slaveInfo.getEnd()) {
+                                    
snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), 
slaveInfo.getEnd(), df.getLength()));
+                                    
journalReplicatedFiles.add(df.getDataFileId());
+                                    continue;
+                                } else {
+                                    // No need to replicate this file.
+                                    continue;
+                                }
+                            }
+                        }
+
+                        // If we got here then it means we need to transfer the
+                        // whole file.
+                        snapshotInfos.add(ReplicationSupport.createInfo(name, 
df.getFile(), 0, df.getLength()));
+                        journalReplicatedFiles.add(df.getDataFileId());
+                    }
+
+                    PBFileInfo info = new PBFileInfo();
+                    info.setName("database");
+                    info.setSnapshotId(snapshotId);
+                    info.setStart(0);
+                    info.setEnd(file.length());
+                    info.setChecksum(ReplicationSupport.copyAndChecksum(file, 
snapshotFile));
+                    snapshotInfos.add(info);
+
+                    rcPayload.setCopyFilesList(snapshotInfos);
+                    ArrayList<String> deleteFiles = new ArrayList<String>();
+                    slaveFiles.remove("database");
+                    for (PBFileInfo unused : slaveFiles.values()) {
+                        deleteFiles.add(unused.getName());
+                    }
+                    rcPayload.setDeleteFilesList(deleteFiles);
+
+                    updateJournalReplicatedFiles();
+                }
+
+            });
+
+            transport.oneway(rc);
+        }
+
+        private void onFileTransfer(ReplicationFrame frame, PBFileInfo 
fileInfo) throws IOException {
+            File file = 
replicationService.getReplicationFile(fileInfo.getName());
+            long payloadSize = fileInfo.getEnd() - fileInfo.getStart();
+
+            if (file.length() < fileInfo.getStart() + payloadSize) {
+                throw new IOException("Requested replication file dose not 
have enough data.");
+            }
+
+            ReplicationFrame rc = new ReplicationFrame();
+            rc.setHeader(new 
PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(payloadSize));
+
+            FileInputStream is = new FileInputStream(file);
+            rc.setPayload(is);
+            try {
+                is.skip(fileInfo.getStart());
+                transport.oneway(rc);
+            } finally {
+                try {
+                    is.close();
+                } catch (Throwable e) {
+                }
+            }
+        }
+
+    }
+
+    /**
+     * Looks at all the journal files being currently replicated and informs 
the
+     * KahaDB so that it does not delete them while the replication is 
occuring.
+     */
+    private void updateJournalReplicatedFiles() {
+        HashSet<Integer> files = 
replicationService.getStore().getJournalFilesBeingReplicated();
+        files.clear();
 
         ArrayList<ReplicationSession> sessionsSnapshot;
         synchronized (this.sessions) {
             // Hurrah for copy on write..
             sessionsSnapshot = this.sessions;
         }
-        
-               for (ReplicationSession session : sessionsSnapshot) {
-                       if( session.journalReplicatedFiles !=null ) {
-                               files.addAll(session.journalReplicatedFiles);
-                       }
-               }
-       }
-       
+
+        for (ReplicationSession session : sessionsSnapshot) {
+            if (session.journalReplicatedFiles != null) {
+                files.addAll(session.journalReplicatedFiles);
+            }
+        }
+    }
+
 }

Modified: 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
--- 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
 (original)
+++ 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
 Mon Nov 24 09:44:12 2008
@@ -34,7 +34,7 @@
  * slave or master facets of the broker.
  * 
  * @author chirino
- * @org.apache.xbean.XBean element="kahadb-replication"
+ * @org.apache.xbean.XBean element="kahadbReplication"
  */
 public class ReplicationService implements Service, ClusterListener {
 
@@ -47,7 +47,7 @@
     private File tempReplicationDir;
     private String uri;
     private ClusterStateManager cluster;
-    private boolean asyncReplication=false;
+    private int minimumReplicas=1;
     
     private KahaDBStore store;
 
@@ -279,12 +279,12 @@
         this.cluster = cluster;
     }
 
-    public void setAsyncReplication(boolean asyncReplication) {
-        this.asyncReplication = asyncReplication;
+    public int getMinimumReplicas() {
+        return minimumReplicas;
     }
 
-    public boolean isAsyncReplication() {
-        return asyncReplication;
+    public void setMinimumReplicas(int minimumReplicas) {
+        this.minimumReplicas = minimumReplicas;
     }
 
 

Modified: 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
--- 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
 (original)
+++ 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
 Mon Nov 24 09:44:12 2008
@@ -48,7 +48,7 @@
 /**
  * 
  * @author chirino
- * @org.apache.xbean.XBean element="zookeeper-cluster"
+ * @org.apache.xbean.XBean element="zookeeperCluster"
  */
 public class ZooKeeperClusterStateManager implements ClusterStateManager, 
Watcher {
     private static final Log LOG = 
LogFactory.getLog(ZooKeeperClusterStateManager.class);

Modified: 
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
--- 
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
 (original)
+++ 
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
 Mon Nov 24 09:44:12 2008
@@ -49,7 +49,7 @@
                StaticClusterStateManager cluster = new 
StaticClusterStateManager();
                
                ReplicationService rs1 = new ReplicationService();
-               rs1.setAsyncReplication(true);
+               rs1.setMinimumReplicas(0);
                rs1.setUri(BROKER1_REPLICATION_ID);
                rs1.setCluster(cluster);
                rs1.setDirectory(new File("target/replication-test/broker1"));
@@ -57,7 +57,7 @@
                rs1.start();
 
         ReplicationService rs2 = new ReplicationService();
-        rs2.setAsyncReplication(true);
+        rs2.setMinimumReplicas(0);
         rs2.setUri(BROKER2_REPLICATION_ID);
         rs2.setCluster(cluster);
         rs2.setDirectory(new File("target/replication-test/broker2"));

Modified: activemq/trunk/kahadb/src/test/resources/broker1/ha.xml
URL: 
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/resources/broker1/ha.xml?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/resources/broker1/ha.xml (original)
+++ activemq/trunk/kahadb/src/test/resources/broker1/ha.xml Mon Nov 24 09:44:12 
2008
@@ -27,21 +27,21 @@
 
   <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
 
-  <kahadb-replication-broker xmlns="http://activemq.apache.org/schema/kahadb";>
+  <kahadbReplicationBroker xmlns="http://activemq.apache.org/schema/kahadb";>
        <replicationService>
-         <kahadb-replication
+         <kahadbReplication
        directory="target/kaha-data/broker1" 
        brokerURI="xbean:broker1/ha-broker.xml" 
        uri="kdbr://localhost:6001"
-       asyncReplication="true">
+       minimumReplicas="0">
        
        <cluster>
-               <zookeeper-cluster 
uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" 
password=""/>
+               <zookeeperCluster 
uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" 
password=""/>
        </cluster>
        
-      </kahadb-replication>
+      </kahadbReplication>
        </replicationService>
-  </kahadb-replication-broker>
+  </kahadbReplicationBroker>
   
 </beans>
 <!-- END SNIPPET: example -->

Modified: activemq/trunk/kahadb/src/test/resources/broker2/ha.xml
URL: 
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/resources/broker2/ha.xml?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/resources/broker2/ha.xml (original)
+++ activemq/trunk/kahadb/src/test/resources/broker2/ha.xml Mon Nov 24 09:44:12 
2008
@@ -27,21 +27,21 @@
 
   <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
 
-  <kahadb-replication-broker xmlns="http://activemq.apache.org/schema/kahadb";>
+  <kahadbReplicationBroker xmlns="http://activemq.apache.org/schema/kahadb";>
        <replicationService>
-         <kahadb-replication
+         <kahadbReplication
        directory="target/kaha-data-broker2" 
        brokerURI="xbean:broker2/ha-broker.xml" 
        uri="kdbr://localhost:6002"
-       asyncReplication="true">
+       minimumReplicas="0">
        
        <cluster>
-               <zookeeper-cluster 
uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" 
password=""/>
+               <zookeeperCluster 
uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" 
password=""/>
        </cluster>
        
-      </kahadb-replication>
+      </kahadbReplication>
        </replicationService>
-  </kahadb-replication-broker>
+  </kahadbReplicationBroker>
   
 </beans>
 <!-- END SNIPPET: example -->


Reply via email to