Author: chirino
Date: Mon Nov 10 12:41:23 2008
New Revision: 712827

URL: http://svn.apache.org/viewvc?rev=712827&view=rev
Log:
More progress on replication.. bulk file sync and real time sync now seems to 
be working.  Need to work on handling all the failure conditions now.

Added:
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReplicationTarget.java
Removed:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/ha/command/
Modified:
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
 Mon Nov 10 12:41:23 2008
@@ -347,7 +347,8 @@
 
                 // 
                 // is it just 1 big write?
-                if (wb.size == write.location.getSize()) {
+                ReplicationTarget replicationTarget = 
dataManager.getReplicationTarget();
+                if (wb.size == write.location.getSize() && 
replicationTarget==null) {
                     forceToDisk = write.sync | write.onComplete != null;
 
                     // Just write it directly..
@@ -360,7 +361,7 @@
 
                 } else {
 
-                    // Combine the smaller writes into 1 big buffer
+                    // We are going to do 1 big write.
                     while (write != null) {
                         forceToDisk |= write.sync | write.onComplete != null;
 
@@ -377,6 +378,11 @@
                     // Now do the 1 big write.
                     ByteSequence sequence = buff.toByteSequence();
                     file.write(sequence.getData(), sequence.getOffset(), 
sequence.getLength());
+                    
+                    if( replicationTarget!=null ) {
+                       
replicationTarget.replicate(wb.writes.getHead().location, sequence, 
forceToDisk);
+                    }
+                    
                     buff.reset();
                 }
 

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java 
(original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java 
Mon Nov 10 12:41:23 2008
@@ -93,7 +93,7 @@
     protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
 
     protected DataFileAppender appender;
-    protected DataFileAccessorPool accessorPool = new 
DataFileAccessorPool(this);
+    protected DataFileAccessorPool accessorPool;
 
     protected Map<Integer, DataFile> fileMap = new HashMap<Integer, 
DataFile>();
     protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, 
DataFile>();
@@ -104,6 +104,7 @@
     protected Runnable cleanupTask;
     protected final AtomicLong totalLength = new AtomicLong();
     protected boolean archiveDataLogs;
+       private ReplicationTarget replicationTarget;
 
     @SuppressWarnings("unchecked")
     public synchronized void start() throws IOException {
@@ -111,6 +112,7 @@
             return;
         }
 
+        accessorPool = new DataFileAccessorPool(this);
         started = true;
         preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - 
PREFERED_DIFF);
         lock();
@@ -176,8 +178,11 @@
             }
         }
 
-        storeState(false);
-
+        ByteSequence storedState = storeState(true);
+        if( dataFiles.isEmpty() ) {
+          appender.storeItem(storedState, Location.MARK_TYPE, true);
+        }
+        
         cleanupTask = new Runnable() {
             public void run() {
                 cleanup();
@@ -317,6 +322,9 @@
         fileByFileMap.clear();
         controlFile.unlock();
         controlFile.dispose();
+        controlFile=null;
+        dataFiles.clear();
+        lastAppendLocation.set(null);
         started = false;
     }
 
@@ -413,6 +421,11 @@
         return mark;
     }
 
+       public synchronized void appendedExternally(Location loc, int length) 
throws IOException {
+               DataFile dataFile = getDataFile(loc);
+               dataFile.incrementLength(length);
+       }
+
     public synchronized Location getNextLocation(Location location) throws 
IOException, IllegalStateException {
 
         Location cur = null;
@@ -541,10 +554,10 @@
         storeState(sync);
     }
 
-    protected synchronized void storeState(boolean sync) throws IOException {
+    protected synchronized ByteSequence storeState(boolean sync) throws 
IOException {
         ByteSequence state = marshallState();
-        appender.storeItem(state, Location.MARK_TYPE, sync);
         controlFile.store(state, sync);
+        return state;
     }
 
     public synchronized Location write(ByteSequence data, boolean sync) throws 
IOException, IllegalStateException {
@@ -660,4 +673,12 @@
         return rc;
     }
 
+       public void setReplicationTarget(ReplicationTarget replicationTarget) {
+               this.replicationTarget = replicationTarget;
+       }
+       public ReplicationTarget getReplicationTarget() {
+               return replicationTarget;
+       }
+
+
 }

Added: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReplicationTarget.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReplicationTarget.java?rev=712827&view=auto
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReplicationTarget.java
 (added)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReplicationTarget.java
 Mon Nov 10 12:41:23 2008
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import org.apache.kahadb.util.ByteSequence;
+
+public interface ReplicationTarget {
+
+       void replicate(Location location, ByteSequence sequence, boolean sync);
+
+}

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java 
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java 
Mon Nov 10 12:41:23 2008
@@ -457,11 +457,11 @@
         return new File(directory, IOHelper.toFileSystemSafeName(name)+".dat");
     }
     
-    private File getFreeFile() {
+    public File getFreeFile() {
         return new File(directory, IOHelper.toFileSystemSafeName(name)+".fre");
     } 
 
-    private File getRecoveryFile() {
+    public File getRecoveryFile() {
         return new File(directory, IOHelper.toFileSystemSafeName(name)+".rec");
     } 
 

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
 Mon Nov 10 12:41:23 2008
@@ -23,6 +23,7 @@
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
@@ -38,15 +39,20 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.journal.DataFile;
 import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.journal.ReplicationTarget;
 import org.apache.kahadb.replication.pb.PBFileInfo;
 import org.apache.kahadb.replication.pb.PBHeader;
 import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBJournalUpdate;
 import org.apache.kahadb.replication.pb.PBSlaveInit;
 import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
 import org.apache.kahadb.replication.pb.PBType;
 import org.apache.kahadb.store.KahaDBStore;
+import org.apache.kahadb.util.ByteSequence;
 
-public class ReplicationMaster implements Service, ClusterListener {
+import com.google.protobuf.ByteString;
+
+public class ReplicationMaster implements Service, ClusterListener, 
ReplicationTarget {
 
        private static final Log LOG = 
LogFactory.getLog(ReplicationServer.class);
 
@@ -55,7 +61,7 @@
        private Object serverMutex = new Object() {
        };
        private TransportServer server;
-       private ArrayList<ReplicationSession> sessions = new 
ArrayList<ReplicationSession>();
+       private CopyOnWriteArrayList<ReplicationSession> sessions = new 
CopyOnWriteArrayList<ReplicationSession>();
 
        public ReplicationMaster(ReplicationServer replication1Server) {
                this.replicationServer = replication1Server;
@@ -83,6 +89,7 @@
                        });
                        server.start();
                }
+               
replicationServer.getStore().getJournal().setReplicationTarget(this);
        }
 
        public void stop() throws Exception {
@@ -95,8 +102,34 @@
        }
 
        public void onClusterChange(ClusterState config) {
-               // TODO: if a slave is removed from the cluster, we should
-               // remove it's replication tracking info.
+       }
+
+
+       /**
+        * This is called by the Journal so that we can replicate the update to 
the 
+        * slaves.
+        */
+       @Override
+       public void replicate(Location location, ByteSequence sequence, boolean 
sync) {
+               if( sessions.isEmpty() ) 
+                       return;
+               ReplicationFrame frame = new ReplicationFrame();
+               frame.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE));
+               PBJournalUpdate payload = new PBJournalUpdate();
+               payload.setLocation(convert(location));
+               payload.setData(ByteString.copyFrom(sequence.getData(), 
sequence.getOffset(), sequence.getLength()));
+               frame.setPayload(payload);
+
+               for (ReplicationSession session : sessions) {
+                       if( session.subscribedToJournalUpdates.get() ) {
+                               // TODO: use async send threads so that the 
frames can be pushed out in parallel. 
+                               try {
+                                       session.transport.oneway(frame);
+                               } catch (IOException e) {
+                                       session.onException(e);
+                               }
+                       }
+               }
        }
 
        class ReplicationSession implements Service, TransportListener {
@@ -171,8 +204,13 @@
                                        // synced to disk, but while a lock is 
still held on the
                                        // store so that no
                                        // updates are allowed.
-
                                        ArrayList<PBFileInfo> infos = new 
ArrayList<PBFileInfo>();
+
+                                       Map<Integer, DataFile> journalFiles = 
store.getJournal().getFileMap();
+                                       for (DataFile df : 
journalFiles.values()) {
+                                               
infos.add(replicationServer.createInfo("journal-" + df.getDataFileId(), 
df.getFile(), df.getLength()));
+                                       }
+                                       
                                        SnapshotStatus snapshot = 
createSnapshot();
                                        PBFileInfo databaseInfo = new 
PBFileInfo();
                                        databaseInfo.setName("database");
@@ -181,11 +219,7 @@
                                        databaseInfo.setEnd(snapshot.size);
                                        
databaseInfo.setChecksum(snapshot.checksum);
                                        infos.add(databaseInfo);
-
-                                       Map<Integer, DataFile> journalFiles = 
store.getJournal().getFileMap();
-                                       for (DataFile df : 
journalFiles.values()) {
-                                               infos.add(createInfo("journal/" 
+ df.getDataFileId(), df.getFile(), df.getLength()));
-                                       }
+                                       
                                        rcPayload.setCopyFilesList(infos);
                                }
                        });
@@ -284,25 +318,9 @@
                }
        }
 
-       private PBFileInfo createInfo(String name, File file, int length) 
throws IOException {
-               PBFileInfo rc = new PBFileInfo();
-               rc.setName(name);
-               FileInputStream is = new FileInputStream(file);
-               byte buffer[] = new byte[1024 * 4];
-               int c;
-
-               long size = 0;
-               Checksum checksum = new Adler32();
-               while (size < length && (c = is.read(buffer, 0, (int) 
Math.min(length - size, buffer.length))) >= 0) {
-                       checksum.update(buffer, 0, c);
-                       size += c;
-               }
-               rc.setChecksum(checksum.getValue());
-               rc.setStart(0);
-               rc.setEnd(size);
-               return rc;
-       }
+
 
        private void onJournalUpdateAck(ReplicationFrame frame, 
PBJournalLocation journalLocation) {
        }
+
 }

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
 Mon Nov 10 12:41:23 2008
@@ -17,12 +17,16 @@
 package org.apache.kahadb.replication;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
 
 import org.apache.activemq.Service;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.page.PageFile;
+import org.apache.kahadb.replication.pb.PBFileInfo;
 import org.apache.kahadb.store.KahaDBStore;
 
 /**
@@ -82,6 +86,8 @@
 
        private ClusterState clusterState;
 
+       private File tempReplicationDir;
+
        public void start() throws Exception {
                // The cluster will let us know about the cluster configuration,
                // which lets us decide if we are going to be a slave or a 
master.
@@ -112,6 +118,7 @@
                                        // If the slave service was not yet 
started.. start it up.
                                        if (slave == null) {
                                                LOG.info("Starting replication 
slave.");
+                                               store.open();
                                                slave = new 
ReplicationSlave(this);
                                                slave.start();
                                        }
@@ -168,10 +175,10 @@
        public File getReplicationFile(String fn) throws IOException {
                if (fn.equals("database")) {
                        return getStore().getPageFile().getFile();
-               } if (fn.startsWith("journal/")) {
+               } if (fn.startsWith("journal-")) {
                        int id;
                        try {
-                               id = 
Integer.parseInt(fn.substring("journal/".length()));
+                               id = 
Integer.parseInt(fn.substring("journal-".length()));
                        } catch (NumberFormatException e) {
                                throw new IOException("Unknown replication file 
name: "+fn);
                        }
@@ -181,6 +188,47 @@
                }
        }
 
+       public File getTempReplicationDir() {
+               if( tempReplicationDir == null ) {
+                       tempReplicationDir = new File( 
getStore().getDirectory(), "replication");
+               }
+               return tempReplicationDir;
+       }
+       
+       public File getTempReplicationFile(String fn, int snapshotId) throws 
IOException {
+               if (fn.equals("database")) {
+                       return new File(getTempReplicationDir(), 
"database-"+snapshotId);
+               } if (fn.startsWith("journal-")) {
+                       int id;
+                       try {
+                               id = 
Integer.parseInt(fn.substring("journal-".length()));
+                       } catch (NumberFormatException e) {
+                               throw new IOException("Unknown replication file 
name: "+fn);
+                       }
+                       return new File(getTempReplicationDir(), fn);
+               } else {
+                       throw new IOException("Unknown replication file name: 
"+fn);
+               }
+       }
+       
+       PBFileInfo createInfo(String name, File file, long length) throws 
IOException {
+               PBFileInfo rc = new PBFileInfo();
+               rc.setName(name);
+               FileInputStream is = new FileInputStream(file);
+               byte buffer[] = new byte[1024 * 4];
+               int c;
+
+               long size = 0;
+               Checksum checksum = new Adler32();
+               while (size < length && (c = is.read(buffer, 0, (int) 
Math.min(length - size, buffer.length))) >= 0) {
+                       checksum.update(buffer, 0, c);
+                       size += c;
+               }
+               rc.setChecksum(checksum.getValue());
+               rc.setStart(0);
+               rc.setEnd(size);
+               return rc;
+       }
        public boolean isMaster() {
                return master!=null;
        }

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
 Mon Nov 10 12:41:23 2008
@@ -16,14 +16,20 @@
  */
 package org.apache.kahadb.replication;
 
+import java.io.DataOutput;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.RandomAccessFile;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.Service;
@@ -32,13 +38,22 @@
 import org.apache.activemq.transport.TransportListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.journal.DataFile;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.replication.pb.PBFileInfo;
 import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBJournalUpdate;
 import org.apache.kahadb.replication.pb.PBSlaveInit;
 import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
 import org.apache.kahadb.replication.pb.PBType;
+import org.apache.kahadb.store.KahaDBStore;
 
 public class ReplicationSlave implements Service, ClusterListener, 
TransportListener {
+       
+       private static final int MAX_TRANSFER_SESSIONS = 1;
+
        private static final Log LOG = 
LogFactory.getLog(ReplicationSlave.class);
 
        private final ReplicationServer replicationServer;
@@ -52,15 +67,76 @@
                transport = TransportFactory.connect(new 
URI(replicationServer.getClusterState().getMaster()));
                transport.setTransportListener(this);
                transport.start();
+
+               // Make sure the replication directory exists.
+               replicationServer.getTempReplicationDir().mkdirs();
                
                ReplicationFrame frame = new ReplicationFrame();
                frame.setHeader(new PBHeader().setType(PBType.SLAVE_INIT));
                PBSlaveInit payload = new PBSlaveInit();
                payload.setNodeId(replicationServer.getNodeId());
+               
+               // This call back is executed once the checkpoint is
+               // completed and all data has been
+               // synced to disk, but while a lock is still held on the
+               // store so that no
+               // updates are allowed.
+
+               HashMap<String, PBFileInfo> infosMap = new HashMap<String, 
PBFileInfo>();
+               
+               // Add all the files that were being transfered..
+               File tempReplicationDir = 
replicationServer.getTempReplicationDir();
+               File[] list = tempReplicationDir.listFiles();
+               if( list!=null ) {
+                       for (File file : list) {
+                               String name = file.getName();
+                               if( name.startsWith("database-") ) {
+                                       int snapshot;
+                                       try {
+                                               snapshot = 
Integer.parseInt(name.substring("database-".length()));
+                                       } catch (NumberFormatException e) {
+                                               continue;
+                                       }
+                                       
+                                       PBFileInfo info = 
replicationServer.createInfo("database", file, file.length());
+                                       info.setSnapshotId(snapshot);
+                                       infosMap.put("database", info);
+                               } else if( name.startsWith("journal-") ) {
+                                       PBFileInfo info = 
replicationServer.createInfo(name, file, file.length());
+                                       infosMap.put(name, info);
+                               }
+                       }
+               }
+               
+               // Add all the db files that were not getting transfered..
+               KahaDBStore store = replicationServer.getStore();
+               Map<Integer, DataFile> journalFiles = 
store.getJournal().getFileMap();
+               for (DataFile df : journalFiles.values()) {
+                       String name = "journal-" + df.getDataFileId();
+                       // Did we have a transfer in progress for that file 
already?
+                       if( infosMap.containsKey(name) ) {
+                               continue;
+                       }
+                       infosMap.put(name, replicationServer.createInfo(name, 
df.getFile(), df.getLength()));
+               }
+               if( !infosMap.containsKey("database") ) {
+                       File pageFile = store.getPageFile().getFile();
+                       if( pageFile.exists() ) {
+                               infosMap.put("database", 
replicationServer.createInfo("database", pageFile, pageFile.length()));
+                       }
+               }
+               
+               ArrayList<PBFileInfo> infos = new 
ArrayList<PBFileInfo>(infosMap.size());
+               for (PBFileInfo info : infosMap.values()) {
+                       infos.add(info);
+               }
+               payload.setCurrentFilesList(infos);
+               
                frame.setPayload(payload);
-               LOG.info("Sending master slave init command: "+payload);
+               LOG.info("Sending master slave init command: " + payload);
+               bulkSynchronizing = true;
                transport.oneway(frame);
-               
+
        }
 
        public void stop() throws Exception {
@@ -76,6 +152,8 @@
                        case SLAVE_INIT_RESPONSE:
                                onSlaveInitResponse(frame, 
(PBSlaveInitResponse) frame.getPayload());
                                break;
+                       case JOURNAL_UPDATE:
+                               onJournalUpdate(frame, (PBJournalUpdate) 
frame.getPayload());
                        }
                } catch (Exception e) {
                        failed(e);
@@ -102,32 +180,137 @@
 
        private Object transferMutex = new Object();
        private LinkedList<PBFileInfo> transferQueue = new 
LinkedList<PBFileInfo>();
+       private boolean bulkSynchronizing;
+       private PBSlaveInitResponse initResponse;
+
+       int journalUpdateFileId;
+       RandomAccessFile journalUpateFile;
+       
+       HashMap<String, PBFileInfo> bulkFiles = new HashMap<String, 
PBFileInfo>();
+       
+       private void onJournalUpdate(ReplicationFrame frame, PBJournalUpdate 
update) throws IOException {
+               boolean onlineRecovery=false;
+               PBJournalLocation location = update.getLocation();
+               byte[] data = update.getData().toByteArray();
+               synchronized (transferMutex) {
+                       if( journalUpateFile==null || 
journalUpdateFileId!=location.getFileId() ) {
+                               if( journalUpateFile!=null) {
+                                       journalUpateFile.close();
+                               }
+                               File file;
+                               String name = "journal-"+location.getFileId();
+                               if( bulkSynchronizing ) {
+                                       file = 
replicationServer.getTempReplicationFile(name, 0);
+                                       if( !bulkFiles.containsKey(name) ) {
+                                               bulkFiles.put(name, new 
PBFileInfo().setName(name));
+                                       }
+                               } else {
+                                       // Once the data has been synced.. we 
are going to 
+                                       // go into an online recovery mode...
+                                       file = 
replicationServer.getReplicationFile(name);
+                                       onlineRecovery=true;
+                               }
+                               journalUpateFile = new RandomAccessFile(file, 
"rw");
+                               journalUpdateFileId = location.getFileId();
+                       }
+                       journalUpateFile.seek(location.getOffset());
+                       journalUpateFile.write(data);
+               }
+               
+               if( onlineRecovery ) {
+                       KahaDBStore store = replicationServer.getStore();
+                       // Let the journal know that we appended to one of it's 
files..
+                       
store.getJournal().appendedExternally(convert(location), data.length);
+                       // Now incrementally recover those records.
+                       store.incrementalRecover();
+               }
+       }
+       
+       private Location convert(PBJournalLocation location) {
+               Location rc = new Location();
+               rc.setDataFileId(location.getFileId());
+               rc.setOffset(location.getOffset());
+               return rc;
+       }
+       
+       private void commitBulkTransfer() throws IOException {
+               synchronized (transferMutex) {
+                       
+                       journalUpateFile.close();
+                       journalUpateFile=null;
+                       replicationServer.getStore().close();
+                       
+                       // If we got a new snapshot of the database, then we 
need to 
+                       // delete it's assisting files too.
+                       if( bulkFiles.containsKey("database") ) {
+                               PageFile pageFile = 
replicationServer.getStore().getPageFile();
+                               pageFile.getRecoveryFile().delete();
+                               pageFile.getFreeFile().delete();
+                       }
+                       
+                       for (PBFileInfo info : bulkFiles.values()) {
+                               File from = 
replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
+                               File to = 
replicationServer.getReplicationFile(info.getName());
+                               move(from, to);
+                       }
+                       
+                       delete(initResponse.getDeleteFilesList());
+                       bulkSynchronizing=false;
+                       
+                       replicationServer.getStore().open();
+               }
+               replicationServer.getStore().incrementalRecover();
+       }
 
        private void onSlaveInitResponse(ReplicationFrame frame, 
PBSlaveInitResponse response) throws Exception {
-               LOG.info("Got init response: "+response);
-               delete(response.getDeleteFilesList());
-               synchronized(transferMutex) {
+               LOG.info("Got init response: " + response);
+               initResponse = response;
+               
+               synchronized (transferMutex) {
+                       bulkFiles.clear();
+                       
+                       List<PBFileInfo> infos = response.getCopyFilesList();
+                       for (PBFileInfo info : infos) {
+                               
+                               bulkFiles.put(info.getName(), info);
+                               File target = 
replicationServer.getReplicationFile(info.getName());
+                               // are we just appending to an existing file 
journal file?
+                               if( info.getName().startsWith("journal-") && 
info.getStart() > 0 && target.exists() ) {
+                                       // Then copy across the first bits..
+                                       File tempFile = 
replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
+                                       
+                                       FileInputStream is = new 
FileInputStream(target);
+                                       FileOutputStream os = new 
FileOutputStream(tempFile);
+                                       try {
+                                               copy(is, os, info.getStart());
+                                       } finally {
+                                               try { is.close(); } catch 
(Throwable e){}
+                                               try { os.close(); } catch 
(Throwable e){}
+                                       }
+                               }
+                       }
+                       
+                       
                        transferQueue.clear();
-                       transferQueue.addAll(response.getCopyFilesList());
+                       transferQueue.addAll(infos);
                }
                addTransferSession();
        }
 
-               
        private PBFileInfo dequeueTransferQueue() throws Exception {
-               synchronized( transferMutex ) {
-                       if( transferQueue.isEmpty() ) {
+               synchronized (transferMutex) {
+                       if (transferQueue.isEmpty()) {
                                return null;
                        }
                        return transferQueue.removeFirst();
                }
        }
-       
+
        LinkedList<TransferSession> transferSessions = new 
LinkedList<TransferSession>();
-       
+
        private void addTransferSession() throws Exception {
-               synchronized( transferMutex ) {
-                       while( !transferQueue.isEmpty() && 
transferSessions.size()<5 ) {
+               synchronized (transferMutex) {
+                       while (!transferQueue.isEmpty() && 
transferSessions.size() < MAX_TRANSFER_SESSIONS) {
                                TransferSession transferSession = new 
TransferSession();
                                transferSessions.add(transferSession);
                                try {
@@ -136,11 +319,43 @@
                                        
transferSessions.remove(transferSession);
                                }
                        }
+                       // Once we are done processing all the transfers..
+                       if (transferQueue.isEmpty() && 
transferSessions.isEmpty()) {
+                               commitBulkTransfer();
+                       }
                }
        }
-       
-       class TransferSession implements Service, TransportListener {
+
+       private void move(File from, File to) throws IOException {
                
+               // If a simple rename/mv does not work..
+               to.delete();
+               if (!from.renameTo(to)) {
+                       
+                       // Copy and Delete.
+                       FileInputStream is = null;
+                       FileOutputStream os = null;
+                       try {
+                               is = new FileInputStream(from);
+                               os = new FileOutputStream(to);
+
+                               os.getChannel().transferFrom(is.getChannel(), 
0, is.getChannel().size());
+                       } finally {
+                               try {
+                                       is.close();
+                               } finally {
+                               }
+                               try {
+                                       os.close();
+                               } finally {
+                               }
+                       }
+                       from.delete();
+               }
+       }
+
+       class TransferSession implements Service, TransportListener {
+
                Transport transport;
                private PBFileInfo info;
                private File toFile;
@@ -158,44 +373,44 @@
                private void sendNextRequestOrStop() {
                        try {
                                PBFileInfo info = dequeueTransferQueue();
-                               if( info !=null ) {
-                               
-                                       toFile = 
replicationServer.getReplicationFile(info.getName());
+                               if (info != null) {
+
+                                       toFile = 
replicationServer.getTempReplicationFile(info.getName(), info.getSnapshotId());
                                        this.info = info;
-                                       
+
                                        ReplicationFrame frame = new 
ReplicationFrame();
                                        frame.setHeader(new 
PBHeader().setType(PBType.FILE_TRANSFER));
                                        frame.setPayload(info);
-                                       
-                                       LOG.info("Requesting file: 
"+info.getName());
+
+                                       LOG.info("Requesting file: " + 
info.getName());
                                        transferStart = 
System.currentTimeMillis();
-                                       
+
                                        transport.oneway(frame);
                                } else {
                                        stop();
                                }
-                               
-                       } catch ( Exception e ) {
+
+                       } catch (Exception e) {
                                failed(e);
                        }
                }
 
                public void stop() throws Exception {
-                       if( stopped.compareAndSet(false, true) ) {
+                       if (stopped.compareAndSet(false, true)) {
                                LOG.info("File transfer session stopped.");
-                               synchronized( transferMutex ) {
-                                       if( info!=null ) {
+                               synchronized (transferMutex) {
+                                       if (info != null) {
                                                transferQueue.addLast(info);
                                        }
                                        info = null;
                                }
-                               Thread stopThread = new Thread("Transfer 
Session Shutdown: "+transport.getRemoteAddress()) {
+                               Thread stopThread = new Thread("Transfer 
Session Shutdown: " + transport.getRemoteAddress()) {
                                        @Override
                                        public void run() {
                                                try {
                                                        transport.stop();
-                                                       synchronized( 
transferMutex ) {
-                                                               
transferSessions.remove(this);
+                                                       synchronized 
(transferMutex) {
+                                                               
transferSessions.remove(TransferSession.this);
                                                                
addTransferSession();
                                                        }
                                                } catch (Exception e) {
@@ -211,21 +426,23 @@
                public void onCommand(Object command) {
                        try {
                                ReplicationFrame frame = (ReplicationFrame) 
command;
-                               InputStream is = (InputStream) 
frame.getPayload();              
+                               InputStream is = (InputStream) 
frame.getPayload();
                                toFile.getParentFile().mkdirs();
-                               FileOutputStream os = new FileOutputStream( 
toFile );
+                               
+                               RandomAccessFile os = new 
RandomAccessFile(toFile, "rw");
+                               os.seek(info.getStart());
                                try {
                                        copy(is, os, 
frame.getHeader().getPayloadSize());
-                                       long transferTime = 
System.currentTimeMillis()-this.transferStart;
-                                       float rate = 
frame.getHeader().getPayloadSize()*transferTime/1024000f;
-                                       LOG.info("File "+info.getName()+" 
transfered in "+transferTime+" (ms) at "+rate+" Kb/Sec");
+                                       long transferTime = 
System.currentTimeMillis() - this.transferStart;
+                                       float rate = 
frame.getHeader().getPayloadSize() * transferTime / 1024000f;
+                                       LOG.info("File " + info.getName() + " 
transfered in " + transferTime + " (ms) at " + rate + " Kb/Sec");
                                } finally {
                                        os.close();
                                }
                                this.info = null;
                                this.toFile = null;
-                               
-                               sendNextRequestOrStop();                        
        
+
+                               sendNextRequestOrStop();
                        } catch (Exception e) {
                                failed(e);
                        }
@@ -237,8 +454,8 @@
 
                public void failed(Exception error) {
                        try {
-                               if( !stopped.get() ) {
-                                       LOG.warn("Replication session failure: 
"+transport.getRemoteAddress());
+                               if (!stopped.get()) {
+                                       LOG.warn("Replication session failure: 
" + transport.getRemoteAddress());
                                }
                                stop();
                        } catch (Exception ignore) {
@@ -247,21 +464,32 @@
 
                public void transportInterupted() {
                }
+
                public void transportResumed() {
                }
 
        }
-       
+
        private void copy(InputStream is, OutputStream os, long length) throws 
IOException {
                byte buffer[] = new byte[1024 * 4];
-               int c=0;
-               long pos=0;
-               while ( pos <length && ((c = is.read(buffer, 0, 
(int)Math.min(buffer.length, length-pos))) >= 0) ) {
+               int c = 0;
+               long pos = 0;
+               while (pos < length && ((c = is.read(buffer, 0, (int) 
Math.min(buffer.length, length - pos))) >= 0)) {
                        os.write(buffer, 0, c);
-                       pos+=c;
+                       pos += c;
                }
        }
-
+       
+       private void copy(InputStream is, DataOutput os, long length) throws 
IOException {
+               byte buffer[] = new byte[1024 * 4];
+               int c = 0;
+               long pos = 0;
+               while (pos < length && ((c = is.read(buffer, 0, (int) 
Math.min(buffer.length, length - pos))) >= 0)) {
+                       os.write(buffer, 0, c);
+                       pos += c;
+               }
+       }
+       
        private void delete(List<String> files) {
                for (String fn : files) {
                        try {
@@ -271,5 +499,4 @@
                }
        }
 
-
 }

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java 
(original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java 
Mon Nov 10 12:41:23 2008
@@ -51,6 +51,7 @@
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
 import org.apache.kahadb.store.data.KahaAddMessageCommand;
 import org.apache.kahadb.store.data.KahaCommitCommand;
 import org.apache.kahadb.store.data.KahaDestination;
@@ -579,6 +580,5 @@
             throw new IllegalArgumentException("Not in the valid destination 
format");
         }
     }
-    
-    
+        
 }

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
 Mon Nov 10 12:41:23 2008
@@ -51,6 +51,7 @@
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
 import org.apache.kahadb.store.data.KahaAddMessageCommand;
 import org.apache.kahadb.store.data.KahaCommitCommand;
 import org.apache.kahadb.store.data.KahaDestination;
@@ -144,11 +145,11 @@
 
     protected boolean deleteAllMessages;
     protected File directory;
-    protected boolean recovering;
     protected Thread checkpointThread;
     protected boolean syncWrites=true;
     int checkpointInterval = 5*1000;
     int cleanupInterval = 30*1000;
+    boolean opened;
     
     protected AtomicBoolean started = new AtomicBoolean();
 
@@ -167,42 +168,8 @@
         }
     }
 
-    public void load() throws IOException {
-
-        recovering=true;
-        
-        // Creates the journal if it does not yet exist.
-        getJournal();
-        if (failIfJournalIsLocked) {
-            journal.lock();
-        } else {
-            while (true) {
-                try {
-                    journal.lock();
-                    break;
-                } catch (IOException e) {
-                    LOG.info("Journal is locked... waiting " + 
(JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be 
unlocked.");
-                    try {
-                        Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
-                    } catch (InterruptedException e1) {
-                    }
-                }
-            }
-        }
-        
-        // Creates the page file if it does not yet exist.
-        getPageFile();
-
-        journal.start();
-        if (deleteAllMessages) {
-            pageFile.delete();
-            journal.delete();
-
-            LOG.info("Persistence store purged.");
-            deleteAllMessages = false;
-        }
-
-        synchronized (indexMutex) {
+       private void loadPageFile() throws IOException {
+               synchronized (indexMutex) {
             pageFile.load();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
@@ -216,8 +183,6 @@
                         metadata.destinations = new BTreeIndex<String, 
StoredDestination>(pageFile, tx.allocate().getPageId());
 
                         tx.store(metadata.page, metadataMarshaller, true);
-                        
-                        store(new KahaTraceCommand().setMessage("CREATED " + 
new Date()));
                     } else {
                         Page<Metadata> page = tx.load(0, metadataMarshaller);
                         metadata = page.get();
@@ -231,7 +196,8 @@
             pageFile.flush();
             
             // Load up all the destinations since we need to scan all the 
indexes to figure out which journal files can be deleted.
-            // Perhaps we should just keep an index of file 
+            // Perhaps we should just keep an index of file
+            storedDestinations.clear();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     for (Iterator<Entry<String, StoredDestination>> iterator = 
metadata.destinations.iterator(tx); iterator.hasNext();) {
@@ -241,13 +207,54 @@
                     }
                 }
             });
+        }
+       }
+       
+       public void open() throws IOException {
+               if( !opened ) {
+               getJournal();
+               if (failIfJournalIsLocked) {
+                   journal.lock();
+               } else {
+                   while (true) {
+                       try {
+                           journal.lock();
+                           break;
+                       } catch (IOException e) {
+                           LOG.info("Journal is locked... waiting " + 
(JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be 
unlocked.");
+                           try {
+                               Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
+                           } catch (InterruptedException e1) {
+                           }
+                       }
+                   }
+               }        
+               getPageFile();
+               journal.start();
+               loadPageFile();
+               opened=true;
+               }
+       }
+       
+    public void load() throws IOException {
+       
+       open();
+        if (deleteAllMessages) {
+            journal.delete();
 
-            // Replay the the journal to get the indexes up to date with the
-            // latest
-            // updates.
+            pageFile.unload();
+            pageFile.delete();
+            metadata = new Metadata();
+            
+            LOG.info("Persistence store purged.");
+            deleteAllMessages = false;
+            
+            loadPageFile();
+        }
+       
+        synchronized (indexMutex) {
             recover();
         }
-        recovering=false;
 
         checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
             public void run() {
@@ -276,9 +283,18 @@
             }
         };
         checkpointThread.start();
-
     }
 
+    
+       public void close() throws IOException {
+        synchronized (indexMutex) {
+            pageFile.unload();
+            metadata = new Metadata();
+        }
+        journal.close();
+        opened=false;
+       }
+       
     public void unload() throws IOException, InterruptedException {
         checkpointThread.join();
 
@@ -293,12 +309,12 @@
                 }
             });
 
-//            metadata.destinations.unload(tx);
             pageFile.unload();
             metadata = new Metadata();
         }
         store(new KahaTraceCommand().setMessage("CLEAN SHUTDOWN " + new 
Date()));
         journal.close();
+        opened=false;
     }
 
     /**
@@ -328,45 +344,61 @@
      * @throws IllegalStateException
      */
     private void recover() throws IllegalStateException, IOException {
-
         long start = System.currentTimeMillis();
-        Location pos = null;
-        
-        // We need to recover the transactions..
-        if (metadata.firstInProgressTransactionLocation != null) {
-            pos = metadata.firstInProgressTransactionLocation;
-        }
         
-        // Perhaps there were no transactions...
-        if( pos==null && metadata.lastUpdate!=null) {
-            // Start replay at the record after the last one recorded in the 
index file.
-            pos = journal.getNextLocation(metadata.lastUpdate);
-            // No journal records need to be recovered.
-            if( pos == null ) {
-                return;
-            }
+        Location recoveryPosition = getRecoveryPosition();
+        if( recoveryPosition ==null ) {
+               return;
         }
         
-        // Do we need to start from the begining?
-        if (pos == null) {
-            // This loads the first position.
-            pos = journal.getNextLocation(null);
-        }
-
         int redoCounter = 0;
-        LOG.info("Journal Recovery Started from: " + journal + " at " + 
pos.getDataFileId() + ":" + pos.getOffset());
+        LOG.info("Journal Recovery Started from: " + journal + " at " + 
recoveryPosition.getDataFileId() + ":" + recoveryPosition.getOffset());
 
-        while (pos != null) {
-            JournalCommand message = load(pos);
-            process(message, pos);
+        while (recoveryPosition != null) {
+            JournalCommand message = load(recoveryPosition);
+            process(message, recoveryPosition);
             redoCounter++;
-            pos = journal.getNextLocation(pos);
+            recoveryPosition = journal.getNextLocation(recoveryPosition);
         }
-
-        Location location = store(new KahaTraceCommand().setMessage("RECOVERED 
" + new Date()), true);
         long end = System.currentTimeMillis();
         LOG.info("Replayed " + redoCounter + " operations from redo log in " + 
((end - start) / 1000.0f) + " seconds.");
     }
+    
+       private Location nextRecoveryPosition;
+       private Location lastRecoveryPosition;
+
+       public void incrementalRecover() throws IOException {
+        if( nextRecoveryPosition == null ) {
+               if( lastRecoveryPosition==null ) {
+                       nextRecoveryPosition = getRecoveryPosition();
+               } else {
+                nextRecoveryPosition = 
journal.getNextLocation(lastRecoveryPosition);
+               }               
+        }
+        while (nextRecoveryPosition != null) {
+               lastRecoveryPosition = nextRecoveryPosition;
+            JournalCommand message = load(lastRecoveryPosition);
+            process(message, lastRecoveryPosition);            
+            nextRecoveryPosition = 
journal.getNextLocation(lastRecoveryPosition);
+        }
+       }
+
+       private Location getRecoveryPosition() throws IOException {
+               
+        // If we need to recover the transactions..
+        if (metadata.firstInProgressTransactionLocation != null) {
+            return metadata.firstInProgressTransactionLocation;
+        }
+        
+        // Perhaps there were no transactions...
+        if( metadata.lastUpdate!=null) {
+            // Start replay at the record after the last one recorded in the 
index file.
+            return journal.getNextLocation(metadata.lastUpdate);
+        }
+        
+        // This loads the first position.
+        return journal.getNextLocation(null);
+       }
 
     protected void checkpointCleanup(final boolean cleanup) {
         try {
@@ -420,9 +452,7 @@
         data.writeFramed(os);
         Location location = journal.write(os.toByteSequence(), sync);
         process(data, location);
-        if( !recovering ) {
-            metadata.lastUpdate = location;
-        }
+        metadata.lastUpdate = location;
         return location;
     }
 

Modified: 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=712827&r1=712826&r2=712827&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
 (original)
+++ 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
 Mon Nov 10 12:41:23 2008
@@ -21,8 +21,11 @@
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 
 import junit.framework.TestCase;
 
@@ -74,7 +77,7 @@
                cluster.setClusterState(clusterState);
                
                try {
-                       sendMesagesTo(500, BROKER1_URI);
+                       sendMesagesTo(100, BROKER1_URI);
                } catch( JMSException e ) {
                        fail("b1 did not become a master.");
                }
@@ -86,13 +89,50 @@
                clusterState.setSlaves(Arrays.asList(slaves));
                cluster.setClusterState(clusterState);
                
-               Thread.sleep(10000);
+               Thread.sleep(1000);
+               
+               
+               try {
+                       sendMesagesTo(100, BROKER1_URI);
+               } catch( JMSException e ) {
+                       fail("Failed to send more messages...");
+               }
+               
+               Thread.sleep(1000);
+               
+               // Make broker 2 the master.
+               clusterState = new ClusterState();
+               clusterState.setMaster(BROKER2_REPLICATION_ID);
+               cluster.setClusterState(clusterState);
+
+               Thread.sleep(1000);
+               
+               assertReceived(200, BROKER2_URI);
                
                b2.stop();              
                b1.stop();
                
        }
 
+       private void assertReceived(int count, String brokerUri) throws 
JMSException {
+               ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory(brokerUri);
+               Connection con = cf.createConnection();
+               con.start();
+               try {
+                       Session session = con.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                       MessageConsumer consumer = 
session.createConsumer(destination);
+                       for (int i = 0; i < count; i++) {
+                               TextMessage m = (TextMessage) 
consumer.receive(1000);
+                               if( m==null ) {
+                                       fail("Failed to receive message: "+i);
+                               }
+                               System.out.println("Got: "+m.getText());
+                       }
+               } finally {
+                       try { con.close(); } catch (Throwable e) {}
+               }
+       }
+
        private void sendMesagesTo(int count, String brokerUri) throws 
JMSException {
                ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory(brokerUri);
                Connection con = cf.createConnection();


Reply via email to