Author: chirino
Date: Tue Nov 11 09:28:19 2008
New Revision: 713089

URL: http://svn.apache.org/viewvc?rev=713089&view=rev
Log:
- Added a perf test to see the impact of using KahaDB replication.
- Other small fixes.

Added:
    
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=713089&r1=713088&r2=713089&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java 
(original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java 
Tue Nov 11 09:28:19 2008
@@ -422,7 +422,23 @@
     }
 
        public synchronized void appendedExternally(Location loc, int length) 
throws IOException {
-               DataFile dataFile = getDataFile(loc);
+               DataFile dataFile = null;
+               if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() 
) {
+                       // It's an update to the current log file..
+                       dataFile = dataFiles.getTail();
+               } else if( dataFiles.getTail().getDataFileId()+1 == 
loc.getDataFileId() ) {
+                       // It's an update to the next log file.
+            int nextNum = loc.getDataFileId();
+            File file = getFile(nextNum);
+            dataFile = new DataFile(file, nextNum, preferedFileLength);
+            // actually allocate the disk space
+            fileMap.put(dataFile.getDataFileId(), dataFile);
+            fileByFileMap.put(file, dataFile);
+            dataFiles.addLast(dataFile);
+               } else {
+                       throw new IOException("Invalid external append.");
+               }
+
                dataFile.incrementLength(length);
        }
 
@@ -433,6 +449,9 @@
             if (cur == null) {
                 if (location == null) {
                     DataFile head = dataFiles.getHead();
+                    if( head == null ) {
+                       return null;
+                    }
                     cur = new Location();
                     cur.setDataFileId(head.getDataFileId());
                     cur.setOffset(0);

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=713089&r1=713088&r2=713089&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
 Tue Nov 11 09:28:19 2008
@@ -166,6 +166,7 @@
                                        break;
                                }
                        } catch (Exception e) {
+                               LOG.warn("Slave request failed: "+e, e);
                                failed(e);
                        }
                }
@@ -286,6 +287,9 @@
        }
 
        private PBJournalLocation convert(Location loc) {
+               if( loc==null ) {
+                       return null;
+               }
                return new 
PBJournalLocation().setFileId(loc.getDataFileId()).setOffset(loc.getOffset());
        }
 

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=713089&r1=713088&r2=713089&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
 Tue Nov 11 09:28:19 2008
@@ -166,7 +166,7 @@
 
        public void failed(Exception error) {
                try {
-                       error.printStackTrace();
+                       LOG.warn("Replication session fail to master: 
"+transport.getRemoteAddress(), error);
                        stop();
                } catch (Exception ignore) {
                }

Added: 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java?rev=713089&view=auto
==============================================================================
--- 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
 (added)
+++ 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
 Tue Nov 11 09:28:19 2008
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store.perf;
+
+import java.util.Arrays;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.perf.SimpleQueueTest;
+import org.apache.kahadb.replication.ClusterState;
+import org.apache.kahadb.replication.ReplicatedBrokerService;
+import org.apache.kahadb.replication.StaticClusterStateManager;
+
+/**
+ * @version $Revision: 712224 $
+ */
+public class ReplicatedKahaStoreQueueTest extends SimpleQueueTest {
+
+       private StaticClusterStateManager cluster;
+       private ReplicatedBrokerService b1;
+       private ReplicatedBrokerService b2;
+       
+       private static final String BROKER1_REPLICATION_ID = 
"kdbr://localhost:60001";
+       private static final String BROKER2_REPLICATION_ID = 
"kdbr://localhost:60002";
+
+    protected String broker2BindAddress="tcp://localhost:61617";
+
+       @Override
+       protected BrokerService createBroker(String uri) throws Exception {
+               
+           clientURI="failover:(" +
+                       
"tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&wireFormat.maxInactivityDuration=50000"
 +
+                       "," +
+                       
"tcp://localhost:61617?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&wireFormat.maxInactivityDuration=50000"
 +
+                       ")?jms.useAsyncSend=true";
+           
+               // This cluster object will control who becomes the master.
+               cluster = new StaticClusterStateManager();
+
+               ClusterState clusterState = new ClusterState();
+               clusterState.setMaster(BROKER1_REPLICATION_ID);
+               String[] slaves = {BROKER2_REPLICATION_ID};
+               clusterState.setSlaves(Arrays.asList(slaves));
+               cluster.setClusterState(clusterState);
+
+               b1 = new ReplicatedBrokerService();
+        b1.setDeleteAllMessagesOnStartup(true);
+        b1.addConnector(uri);
+        b1.setUseShutdownHook(false);
+
+               b1.setDataDirectory("target/test-amq-data/perfTest-b1/amqdb");
+               b1.setBrokerName("broker1");
+               b1.getReplicationServer().setNodeId(BROKER1_REPLICATION_ID);
+               b1.getReplicationServer().setCluster(cluster);
+               b1.start();
+               
+               Thread.sleep(1000);
+               
+               b2 = new ReplicatedBrokerService();
+               b2.addConnector(broker2BindAddress);
+               b2.setDataDirectory("target/test-amq-data/perfTest-b2/amqdb");
+               b2.setBrokerName("broker1");
+               b2.getReplicationServer().setNodeId(BROKER2_REPLICATION_ID);
+               b2.getReplicationServer().setCluster(cluster);
+               b2.start();
+
+               
+               return b1;
+       }
+       
+       @Override
+       protected void tearDown() throws Exception {
+               if( b2!=null ) {
+                       b2.stop();
+                       b2 = null;
+               }
+       }
+       
+}
+


Reply via email to