Author: chirino
Date: Tue Nov 18 06:30:35 2008
New Revision: 718615

URL: http://svn.apache.org/viewvc?rev=718615&view=rev
Log:
The ReplicationServer now uses a BrokerFactory to create and destroy broker 
instances.  This allows us to bring a master back online after it has been 
taken offline.

Removed:
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicatedBrokerService.java
Modified:
    activemq/sandbox/kahadb/pom.xml
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
    
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java

Modified: activemq/sandbox/kahadb/pom.xml
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/pom.xml?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- activemq/sandbox/kahadb/pom.xml (original)
+++ activemq/sandbox/kahadb/pom.xml Tue Nov 18 06:30:35 2008
@@ -89,6 +89,12 @@
       <optional>true</optional>
     </dependency>
     <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-context</artifactId>
+      <version>2.5.5</version>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
       <version>3.0.0</version>

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
 Tue Nov 18 06:30:35 2008
@@ -69,7 +69,7 @@
 
        public void start() throws Exception {
                synchronized (serverMutex) {
-                       server = TransportFactory.bind(new 
URI(replicationServer.getNodeId()));
+                       server = TransportFactory.bind(new 
URI(replicationServer.getUri()));
                        server.setAcceptListener(new TransportAcceptListener() {
                                public void onAccept(Transport transport) {
                                        try {

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationServer.java
 Tue Nov 18 06:30:35 2008
@@ -23,6 +23,9 @@
 import java.util.zip.Checksum;
 
 import org.apache.activemq.Service;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.page.PageFile;
@@ -41,32 +44,46 @@
 
     private KahaDBStore store;
 
-       private ReplicatedBrokerService brokerService;
+       private BrokerService brokerService;
 
-       public ReplicationServer() {
-       }
-
-       public ReplicatedBrokerService getBrokerService() {
-               return brokerService;
-       }
+    private File directory = new File(IOHelper.getDefaultDataDirectory());
 
-       public void setBrokerService(ReplicatedBrokerService brokerService) {
-               this.brokerService = brokerService;
+       public ReplicationServer() {
        }
 
        public KahaDBStore getStore() {
+           if( store == null ) {
+               store = new KahaDBStore();
+               store.setDirectory(directory);
+           }
                return store;
        }
-       public void setStore(KahaDBStore store) {
+       public File getDirectory() {
+        return directory;
+    }
+
+    public void setDirectory(File directory) {
+        this.directory = directory;
+    }
+
+    public String getBrokerURI() {
+        return brokerURI;
+    }
+
+    public void setBrokerURI(String brokerURI) {
+        this.brokerURI = brokerURI;
+    }
+
+    public void setStore(KahaDBStore store) {
                this.store = store;
        }
 
-       public String getNodeId() {
-               return nodeId;
+       public String getUri() {
+               return uri;
        }
 
-       public void setNodeId(String nodeId) {
-               this.nodeId = nodeId;
+       public void setUri(String nodeId) {
+               this.uri = nodeId;
        }
 
        public ClusterStateManager getCluster() {
@@ -78,7 +95,7 @@
        }
 
        PageFile pageFile;
-       String nodeId;
+       String uri;
        ClusterStateManager cluster;
 
        ReplicationMaster master;
@@ -88,9 +105,12 @@
 
        private File tempReplicationDir;
 
+    private String brokerURI = "xbean:broker.xml";
+
        public void start() throws Exception {
                // The cluster will let us know about the cluster configuration,
                // which lets us decide if we are going to be a slave or a 
master.
+        getStore().open();
                cluster.addListener(this);
                cluster.start();
        }
@@ -98,6 +118,7 @@
        public void stop() throws Exception {
                cluster.removeListener(this);
                cluster.stop();
+               getStore().close();
        }
 
        public void onClusterChange(ClusterState clusterState) {
@@ -110,10 +131,11 @@
                                                LOG.info("Shutting down master 
due to cluster state change.");
                                                master.stop();
                                                master = null;
-                                               // TODO: broker service does 
not support getting restarted once it's been stopped. :(
-                                               // so at this point we need, to 
re-create the broker if we want to go back into slave 
-                                               // mode.
-                                               brokerService.stopMaster();
+                                               brokerService.stop();
+                                               brokerService=null;
+                                               // Stopping the broker service 
actually stops the store too..
+                                               // so we need to open it back 
up.
+                                               getStore().open();
                                        }
                                        // If the slave service was not yet 
started.. start it up.
                                        if (slave == null) {
@@ -132,9 +154,10 @@
                                        // If the master service was not yet 
started.. start it up.
                                        if (master == null) {
                                                LOG.info("Starting Master.");
+                                               brokerService = 
createBrokerService();
+                        brokerService.start();
                                                master = new 
ReplicationMaster(this);
                                                master.start();
-                                               brokerService.startMaster();
                                        }
                                        
                                        master.onClusterChange(clusterState);   
                                
@@ -159,16 +182,26 @@
                }
        }
 
-       public ClusterState getClusterState() {
+       public BrokerService getBrokerService() {
+        return brokerService;
+    }
+
+    private BrokerService createBrokerService() throws Exception {
+           BrokerService rc = BrokerFactory.createBroker(brokerURI );
+           rc.setPersistenceAdapter(getStore());
+           return rc;
+    }
+
+    public ClusterState getClusterState() {        
                return clusterState;
        }
 
        private boolean areWeTheSlave(ClusterState config) {
-               return config.getSlaves().contains(nodeId);
+               return config.getSlaves().contains(uri);
        }
        
        private boolean areWeTheMaster(ClusterState config) {
-               return nodeId.equals(config.getMaster());
+               return uri.equals(config.getMaster());
        }
 
        public File getReplicationFile(String fn) throws IOException {

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
 Tue Nov 18 06:30:35 2008
@@ -99,8 +99,6 @@
                                return;
                        }
                        
-                       replicationServer.getStore().open();
-                       
                        transport = TransportFactory.connect(new URI(master));
                        transport.setTransportListener(this);
                        transport.start();
@@ -111,7 +109,7 @@
                        ReplicationFrame frame = new ReplicationFrame();
                        frame.setHeader(new 
PBHeader().setType(PBType.SLAVE_INIT));
                        PBSlaveInit payload = new PBSlaveInit();
-                       payload.setNodeId(replicationServer.getNodeId());
+                       payload.setNodeId(replicationServer.getUri());
                        
                        // This call back is executed once the checkpoint is
                        // completed and all data has been
@@ -199,8 +197,6 @@
                                journalUpateFile=null;
                        }
                        journalUpdateFileId=0;
-                       
-                       replicationServer.getStore().close();
                }
        }
 
@@ -305,7 +301,6 @@
                        synchronized (transferMutex) {
                                
                                LOG.info("Slave synhcronization complete, going 
online...");
-
                                replicationServer.getStore().close();
                                
                                if( journalUpateFile!=null ) {
@@ -332,7 +327,6 @@
                                online=true;
                                
                                replicationServer.getStore().open();
-                               
                                LOG.info("Slave is now online.  We are now 
eligible to become the master.");
                        }
                        

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
 Tue Nov 18 06:30:35 2008
@@ -301,15 +301,17 @@
        
     public void unload() throws IOException, InterruptedException {
         synchronized (indexMutex) {
-            metadata.state = CLOSED_STATE;
-            metadata.firstInProgressTransactionLocation = 
getFirstInProgressTxLocation();
-
-            pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                public void execute(Transaction tx) throws IOException {
-                    tx.store(metadata.page, metadataMarshaller, true);
-                }
-            });
-            close();
+            if( pageFile.isLoaded() ) {
+                metadata.state = CLOSED_STATE;
+                metadata.firstInProgressTransactionLocation = 
getFirstInProgressTxLocation();
+    
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    public void execute(Transaction tx) throws IOException {
+                        tx.store(metadata.page, metadataMarshaller, true);
+                    }
+                });
+                close();
+            }
         }
     }
 
@@ -462,7 +464,11 @@
     public JournalCommand load(Location location) throws IOException {
         ByteSequence data = journal.read(location);
         DataByteArrayInputStream is = new DataByteArrayInputStream(data);
-        KahaEntryType type = KahaEntryType.valueOf(is.readByte());
+        byte readByte = is.readByte();
+        KahaEntryType type = KahaEntryType.valueOf(readByte);
+        if( type == null ) {
+            throw new IOException("Could not load journal record. Invalid 
location: "+location);
+        }
         JournalCommand message = (JournalCommand)type.createMessage();
         message.mergeFramed(is);
         return message;

Modified: 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
 (original)
+++ 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
 Tue Nov 18 06:30:35 2008
@@ -16,6 +16,7 @@
  */
 package org.apache.kahadb.replication;
 
+import java.io.File;
 import java.util.Arrays;
 
 import javax.jms.Connection;
@@ -47,21 +48,19 @@
                // This cluster object will control who becomes the master.
                StaticClusterStateManager cluster = new 
StaticClusterStateManager();
                
-               ReplicatedBrokerService b1 = new ReplicatedBrokerService();
-               b1.addConnector(BROKER1_URI);
-               b1.setDataDirectory("target/replication-test/broker1");
-               b1.setBrokerName("broker1");
-               b1.getReplicationServer().setNodeId(BROKER1_REPLICATION_ID);
-               b1.getReplicationServer().setCluster(cluster);
-               b1.start();
-               
-               ReplicatedBrokerService b2 = new ReplicatedBrokerService();
-               b2.addConnector(BROKER2_URI);
-               b2.setDataDirectory("target/replication-test/broker2");
-               b2.setBrokerName("broker2");
-               b2.getReplicationServer().setNodeId(BROKER2_REPLICATION_ID);
-               b2.getReplicationServer().setCluster(cluster);
-               b2.start();
+               ReplicationServer rs1 = new ReplicationServer();
+               rs1.setUri(BROKER1_REPLICATION_ID);
+               rs1.setCluster(cluster);
+               rs1.setDirectory(new File("target/replication-test/broker1"));
+               rs1.setBrokerURI("broker://("+BROKER1_URI+")/broker1");
+               rs1.start();
+
+        ReplicationServer rs2 = new ReplicationServer();
+        rs2.setUri(BROKER2_REPLICATION_ID);
+        rs2.setCluster(cluster);
+        rs2.setDirectory(new File("target/replication-test/broker2"));
+        rs2.setBrokerURI("broker://(" + BROKER2_URI + ")/broker2");
+        rs2.start();
                
 //             // None of the brokers should be accepting connections since 
they are not masters.
 //             try {
@@ -108,8 +107,8 @@
                
                assertReceived(200, BROKER2_URI);
                
-               b2.stop();              
-               b1.stop();
+               rs2.stop();             
+               rs1.stop();
                
        }
 

Modified: 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java?rev=718615&r1=718614&r2=718615&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
 (original)
+++ 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
 Tue Nov 18 06:30:35 2008
@@ -16,12 +16,13 @@
  */
 package org.apache.kahadb.store.perf;
 
+import java.io.File;
 import java.util.Arrays;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.perf.SimpleQueueTest;
 import org.apache.kahadb.replication.ClusterState;
-import org.apache.kahadb.replication.ReplicatedBrokerService;
+import org.apache.kahadb.replication.ReplicationServer;
 import org.apache.kahadb.replication.StaticClusterStateManager;
 
 /**
@@ -30,13 +31,13 @@
 public class ReplicatedKahaStoreQueueTest extends SimpleQueueTest {
 
        private StaticClusterStateManager cluster;
-       private ReplicatedBrokerService b1;
-       private ReplicatedBrokerService b2;
        
        private static final String BROKER1_REPLICATION_ID = 
"kdbr://localhost:60001";
        private static final String BROKER2_REPLICATION_ID = 
"kdbr://localhost:60002";
 
     protected String broker2BindAddress="tcp://localhost:61617";
+    private ReplicationServer rs1;
+    private ReplicationServer rs2;
 
        @Override
        protected BrokerService createBroker(String uri) throws Exception {
@@ -56,37 +57,33 @@
                clusterState.setSlaves(Arrays.asList(slaves));
                cluster.setClusterState(clusterState);
 
-               b1 = new ReplicatedBrokerService();
-        b1.setDeleteAllMessagesOnStartup(true);
-        b1.addConnector(uri);
-        b1.setUseShutdownHook(false);
-
-               b1.setDataDirectory("target/test-amq-data/perfTest-b1/amqdb");
-               b1.setBrokerName("broker1");
-               b1.getReplicationServer().setNodeId(BROKER1_REPLICATION_ID);
-               b1.getReplicationServer().setCluster(cluster);
-               b1.start();
-               
-               Thread.sleep(1000);
-               
-               b2 = new ReplicatedBrokerService();
-               b2.addConnector(broker2BindAddress);
-               b2.setDataDirectory("target/test-amq-data/perfTest-b2/amqdb");
-               b2.setBrokerName("broker1");
-               b2.getReplicationServer().setNodeId(BROKER2_REPLICATION_ID);
-               b2.getReplicationServer().setCluster(cluster);
-               b2.start();
+        rs1 = new ReplicationServer();
+        rs1.setUri(BROKER1_REPLICATION_ID);
+        rs1.setCluster(cluster);
+        rs1.setDirectory(new File("target/replication-test/broker1"));
+        rs1.setBrokerURI("broker://("+uri+")/broker1");
+        rs1.start();
+
+        rs2 = new ReplicationServer();
+        rs2.setUri(BROKER2_REPLICATION_ID);
+        rs2.setCluster(cluster);
+        rs2.setDirectory(new File("target/replication-test/broker2"));
+        rs2.setBrokerURI("broker://(" + broker2BindAddress + ")/broker2");
+        rs2.start();
 
-               
-               return b1;
+               return rs1.getBrokerService();
        }
        
        @Override
        protected void tearDown() throws Exception {
-               if( b2!=null ) {
-                       b2.stop();
-                       b2 = null;
+               if( rs1!=null ) {
+                       rs1.stop();
+                       rs1 = null;
                }
+        if( rs2!=null ) {
+            rs2.stop();
+            rs2 = null;
+        }
        }
        
 }


Reply via email to