Author: chirino
Date: Fri Nov 21 10:03:05 2008
New Revision: 719659

URL: http://svn.apache.org/viewvc?rev=719659&view=rev
Log:
We now properly support forcing sync replication to slaves to ensure that all 
updates are always replicated to at least 1 other slave.

The asyncReplication="true" option can be set which disables it and allows a 
master to continue operating even if there are no slaves online.  When 
asyncReplication="true", then you allow windows of time where ALL data may not 
be replicated and therefore you could have some data loss.

Modified:
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
    
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
    activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml
    activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
 Fri Nov 21 10:03:05 2008
@@ -23,8 +23,9 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -50,32 +51,37 @@
 import org.apache.kahadb.store.KahaDBStore;
 import org.apache.kahadb.util.ByteSequence;
 
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
 public class ReplicationMaster implements Service, ClusterListener, 
ReplicationTarget {
 
        private static final Log LOG = 
LogFactory.getLog(ReplicationService.class);
 
-       private final ReplicationService replicationServer;
+       private final ReplicationService replicationService;
 
        private Object serverMutex = new Object() {};
        private TransportServer server;
-       private CopyOnWriteArrayList<ReplicationSession> sessions = new 
CopyOnWriteArrayList<ReplicationSession>();
        
-       AtomicInteger nextSnapshotId = new AtomicInteger();
+       private ArrayList<ReplicationSession> sessions = new 
ArrayList<ReplicationSession>();
+       
+       private final AtomicInteger nextSnapshotId = new AtomicInteger();
+    private final Map<Location, CountDownLatch> requestMap = new 
LinkedHashMap<Location, CountDownLatch>();
 
-       public ReplicationMaster(ReplicationService replication1Server) {
-               this.replicationServer = replication1Server;
+       public ReplicationMaster(ReplicationService replicationService) {
+               this.replicationService = replicationService;
        }
 
        public void start() throws Exception {
                synchronized (serverMutex) {
-                       server = TransportFactory.bind(new 
URI(replicationServer.getUri()));
+                       server = TransportFactory.bind(new 
URI(replicationService.getUri()));
                        server.setAcceptListener(new TransportAcceptListener() {
                                public void onAccept(Transport transport) {
                                        try {
                                                synchronized (serverMutex) {
                                                        ReplicationSession 
session = new ReplicationSession(transport);
                                                        session.start();
-                                                       sessions.add(session);
+                                                       addSession(session);
                                                }
                                        } catch (Exception e) {
                                                LOG.info("Could not accept 
replication connection from slave at " + transport.getRemoteAddress() + ", due 
to: " + e, e);
@@ -88,73 +94,204 @@
                        });
                        server.start();
                }
-               
replicationServer.getStore().getJournal().setReplicationTarget(this);
-       }
-
-       public void stop() throws Exception {
-               synchronized (serverMutex) {
-                       if (server != null) {
-                               server.stop();
-                               server = null;
-                       }
-               }
+               
replicationService.getStore().getJournal().setReplicationTarget(this);
        }
+       
+    boolean isStarted() {
+        synchronized (serverMutex) {
+            return server!=null;
+        }
+    }
+    
+    public void stop() throws Exception {
+        replicationService.getStore().getJournal().setReplicationTarget(null);
+        synchronized (serverMutex) {
+            if (server != null) {
+                server.stop();
+                server = null;
+            }
+        }
+        
+        ArrayList<ReplicationSession> sessionsSnapshot;
+        synchronized (this.sessions) {
+            sessionsSnapshot = this.sessions;
+        }
+        
+        for (ReplicationSession session: sessionsSnapshot) {
+            session.stop();
+        }
+    }
+
+       protected void addSession(ReplicationSession session) {
+           synchronized (sessions) {
+               sessions = new ArrayList<ReplicationSession>(sessions);
+               sessions.add(session);
+        }
+    }
+       
+    protected void removeSession(ReplicationSession session) {
+        synchronized (sessions) {
+            sessions = new ArrayList<ReplicationSession>(sessions);
+            sessions.remove(session);
+        }
+    }
 
        public void onClusterChange(ClusterState config) {
                // For now, we don't really care about changes in the slave 
config..
        }
 
-
        /**
         * This is called by the Journal so that we can replicate the update to 
the 
         * slaves.
         */
        public void replicate(Location location, ByteSequence sequence, boolean 
sync) {
-               if( sessions.isEmpty() ) 
-                       return;
-               ReplicationFrame frame = new ReplicationFrame();
-               frame.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE));
-               PBJournalUpdate payload = new PBJournalUpdate();
-               payload.setLocation(ReplicationSupport.convert(location));
-               payload.setData(new 
org.apache.activemq.protobuf.Buffer(sequence.getData(), sequence.getOffset(), 
sequence.getLength()));
-               frame.setPayload(payload);
-
-               for (ReplicationSession session : sessions) {
+           ArrayList<ReplicationSession> sessionsSnapshot;
+        synchronized (this.sessions) {
+            // Hurrah for copy on write..
+            sessionsSnapshot = this.sessions;
+        }
+           
+
+        // We may be configured to always do async replication..
+               if ( replicationService.isAsyncReplication() ) {
+                   sync=false;
+               }
+               CountDownLatch latch=null;
+               if( sync ) {
+               latch = new CountDownLatch(1);
+            synchronized (requestMap) {
+                requestMap.put(location, latch);
+            }
+               }
+               
+               ReplicationFrame frame=null;
+               for (ReplicationSession session : sessionsSnapshot) {
                        if( session.subscribedToJournalUpdates.get() ) {
+                           
+                           // Lazy create the frame since we may have not 
avilable sessions to send to.
+                           if( frame == null ) {
+                       frame = new ReplicationFrame();
+                    frame.setHeader(new 
PBHeader().setType(PBType.JOURNAL_UPDATE));
+                    PBJournalUpdate payload = new PBJournalUpdate();
+                    payload.setLocation(ReplicationSupport.convert(location));
+                    payload.setData(new 
org.apache.activemq.protobuf.Buffer(sequence.getData(), sequence.getOffset(), 
sequence.getLength()));
+                    payload.setSendAck(sync);
+                    frame.setPayload(payload);
+                           }
+
                                // TODO: use async send threads so that the 
frames can be pushed out in parallel. 
                                try {
+                                   session.setLastUpdateLocation(location);
                                        session.transport.oneway(frame);
                                } catch (IOException e) {
                                        session.onException(e);
                                }
                        }
                }
+               
+        if (sync) {
+            try {
+                int timeout = 500;
+                int counter=0;
+                while( true ) {
+                    if( latch.await(timeout, TimeUnit.MILLISECONDS) ) {
+                        synchronized (requestMap) {
+                            requestMap.remove(location);
+                        }
+                        return;
+                    }
+                    if( !isStarted() ) {
+                        return;
+                    }
+                    counter++;
+                    if( (counter%10)==0 ) {
+                        LOG.warn("KahaDB is waiting for slave to come online. 
"+(timeout*counter/1000.f)+" seconds have elapsed.");
+                    }
+                } 
+            } catch (InterruptedException ignore) {
+            }
+        }
+               
        }
+       
+    private void ackAllFromTo(Location lastAck, Location newAck) {
+        if ( replicationService.isAsyncReplication() ) {
+            return;
+        }
+        
+        ArrayList<Entry<Location, CountDownLatch>> entries;
+        synchronized (requestMap) {
+            entries = new ArrayList<Entry<Location, 
CountDownLatch>>(requestMap.entrySet());
+        }
+        boolean inRange=false;
+        for (Entry<Location, CountDownLatch> entry : entries) {
+            Location l = entry.getKey();
+            if( !inRange ) {
+                if( lastAck==null || lastAck.compareTo(l) < 0 ) {
+                    inRange=true;
+                }
+            }
+            if( inRange ) {
+                entry.getValue().countDown();
+                if( newAck!=null && l.compareTo(newAck) <= 0 ) {
+                    return;
+                }
+            }
+        }
+    }
+
 
        class ReplicationSession implements Service, TransportListener {
 
                private final Transport transport;
                private final AtomicBoolean subscribedToJournalUpdates = new 
AtomicBoolean();
+        private boolean stopped;
                
                private File snapshotFile;
                private HashSet<Integer> journalReplicatedFiles;
-               private boolean online;
+               private Location lastAckLocation;
+        private Location lastUpdateLocation;
+        private boolean online;
 
                public ReplicationSession(Transport transport) {
                        this.transport = transport;
                }
 
-               public void start() throws Exception {
+               synchronized public void setLastUpdateLocation(Location 
lastUpdateLocation) {
+            this.lastUpdateLocation = lastUpdateLocation;
+        }
+
+        public void start() throws Exception {
                        transport.setTransportListener(this);
                        transport.start();
                }
 
-               public void stop() throws Exception {
-                       deleteReplicationData();
-                       transport.stop();
+        synchronized public void stop() throws Exception {
+                   if ( !stopped  ) { 
+                       stopped=true;
+                       deleteReplicationData();
+                       transport.stop();
+                   }
                }
 
-               public void onCommand(Object command) {
+               synchronized private void onJournalUpdateAck(ReplicationFrame 
frame, PBJournalLocation location) {
+            Location ack = ReplicationSupport.convert(location);
+                   if( online ) {
+                ackAllFromTo(lastAckLocation, ack);
+                   }
+            lastAckLocation=ack;
+           }
+               
+               synchronized private void onSlaveOnline(ReplicationFrame frame) 
{
+            deleteReplicationData();
+            online  = true;
+            if( lastAckLocation!=null ) {
+                ackAllFromTo(null, lastAckLocation);
+            }
+            
+        }
+
+        public void onCommand(Object command) {
                        try {
                                ReplicationFrame frame = (ReplicationFrame) 
command;
                                switch (frame.getHeader().getType()) {
@@ -193,11 +330,6 @@
                public void transportResumed() {
                }
                
-               private void onSlaveOnline(ReplicationFrame frame) {
-                       online = true;
-                       deleteReplicationData();
-               }
-
                private void deleteReplicationData() {
                        if( snapshotFile!=null ) {
                                snapshotFile.delete();
@@ -229,7 +361,7 @@
                        }
                        
                        
-                       final KahaDBStore store = replicationServer.getStore();
+                       final KahaDBStore store = replicationService.getStore();
                        store.checkpoint(new Callback() {
                                public void execute() throws Exception {
                                        // This call back is executed once the 
checkpoint is
@@ -238,11 +370,14 @@
                                        // that no updates are done while we 
are in this
                                        // method.
                                        
-                                       KahaDBStore store = 
replicationServer.getStore();
-
+                                       KahaDBStore store = 
replicationService.getStore();
+                                       if( lastAckLocation==null ) {
+                                           lastAckLocation = 
store.getLastUpdatePosition();
+                                       }
+                                       
                                        int snapshotId = 
nextSnapshotId.incrementAndGet();
                                        File file = 
store.getPageFile().getFile();
-                                       File dir = 
replicationServer.getTempReplicationDir();
+                                       File dir = 
replicationService.getTempReplicationDir();
                                        dir.mkdirs();
                                        snapshotFile = new File(dir, 
"snapshot-" + snapshotId);
                                        
@@ -294,7 +429,6 @@
                                        }
                                        
rcPayload.setDeleteFilesList(deleteFiles);
                                        
-                                       
                                        updateJournalReplicatedFiles();
                                }
 
@@ -304,7 +438,7 @@
                }
                
                private void onFileTransfer(ReplicationFrame frame, PBFileInfo 
fileInfo) throws IOException {
-                       File file = 
replicationServer.getReplicationFile(fileInfo.getName());
+                       File file = 
replicationService.getReplicationFile(fileInfo.getName());
                        long payloadSize = 
fileInfo.getEnd()-fileInfo.getStart();
                        
                        if( file.length() < fileInfo.getStart()+payloadSize ) {
@@ -334,17 +468,20 @@
         * it does not delete them while the replication is occuring.
         */
        private void updateJournalReplicatedFiles() {
-               HashSet<Integer>  files = 
replicationServer.getStore().getJournalFilesBeingReplicated();
+               HashSet<Integer>  files = 
replicationService.getStore().getJournalFilesBeingReplicated();
                files.clear();
-               for (ReplicationSession session : sessions) {
+
+        ArrayList<ReplicationSession> sessionsSnapshot;
+        synchronized (this.sessions) {
+            // Hurrah for copy on write..
+            sessionsSnapshot = this.sessions;
+        }
+        
+               for (ReplicationSession session : sessionsSnapshot) {
                        if( session.journalReplicatedFiles !=null ) {
                                files.addAll(session.journalReplicatedFiles);
                        }
                }
        }
        
-
-       private void onJournalUpdateAck(ReplicationFrame frame, 
PBJournalLocation journalLocation) {
-       }
-
 }

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
 Fri Nov 21 10:03:05 2008
@@ -47,6 +47,7 @@
     private File tempReplicationDir;
     private String uri;
     private ClusterStateManager cluster;
+    private boolean asyncReplication=false;
     
     private KahaDBStore store;
 
@@ -278,5 +279,14 @@
         this.cluster = cluster;
     }
 
+    public void setAsyncReplication(boolean asyncReplication) {
+        this.asyncReplication = asyncReplication;
+    }
+
+    public boolean isAsyncReplication() {
+        return asyncReplication;
+    }
+
+
 
 }

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
 Fri Nov 21 10:03:05 2008
@@ -255,6 +255,20 @@
        }
        
        private void onJournalUpdate(ReplicationFrame frame, PBJournalUpdate 
update) throws IOException {
+           
+           // Send an ack back once we get the ack.. yeah it's a little dirty 
to ack before it's on disk,
+           // but chances are low that both machines are going to loose power 
at the same time and this way,
+           // we reduce the latency the master sees from us.
+           if( update.getSendAck() ) {
+               ReplicationFrame ack = new ReplicationFrame();
+               ack.setHeader(new 
PBHeader().setType(PBType.JOURNAL_UPDATE_ACK));
+               ack.setPayload(update.getLocation());
+               transport.oneway(ack);
+           }
+           
+           // TODO: actually do the disk write in an async thread so that this 
thread can be  
+           // start reading in the next journal updated.
+           
                boolean onlineRecovery=false;
                PBJournalLocation location = update.getLocation();
                byte[] data = update.getData().toByteArray();

Modified: 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
 (original)
+++ 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
 Fri Nov 21 10:03:05 2008
@@ -49,6 +49,7 @@
                StaticClusterStateManager cluster = new 
StaticClusterStateManager();
                
                ReplicationService rs1 = new ReplicationService();
+               rs1.setAsyncReplication(true);
                rs1.setUri(BROKER1_REPLICATION_ID);
                rs1.setCluster(cluster);
                rs1.setDirectory(new File("target/replication-test/broker1"));
@@ -56,6 +57,7 @@
                rs1.start();
 
         ReplicationService rs2 = new ReplicationService();
+        rs2.setAsyncReplication(true);
         rs2.setUri(BROKER2_REPLICATION_ID);
         rs2.setCluster(cluster);
         rs2.setDirectory(new File("target/replication-test/broker2"));

Modified: activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml (original)
+++ activemq/sandbox/kahadb/src/test/resources/broker1/ha.xml Fri Nov 21 
10:03:05 2008
@@ -32,7 +32,8 @@
          <kahadb-replication
        directory="target/kaha-data/broker1" 
        brokerURI="xbean:broker1/ha-broker.xml" 
-       uri="kdbr://localhost:6001">
+       uri="kdbr://localhost:6001"
+       asyncReplication="true">
        
        <cluster>
                <zookeeper-cluster 
uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" 
password=""/>

Modified: activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml?rev=719659&r1=719658&r2=719659&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml (original)
+++ activemq/sandbox/kahadb/src/test/resources/broker2/ha.xml Fri Nov 21 
10:03:05 2008
@@ -32,7 +32,8 @@
          <kahadb-replication
        directory="target/kaha-data-broker2" 
        brokerURI="xbean:broker2/ha-broker.xml" 
-       uri="kdbr://localhost:6002">
+       uri="kdbr://localhost:6002"
+       asyncReplication="true">
        
        <cluster>
                <zookeeper-cluster 
uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" 
password=""/>


Reply via email to