Author: chirino
Date: Tue Nov 11 09:28:19 2008
New Revision: 713089
URL: http://svn.apache.org/viewvc?rev=713089&view=rev
Log:
- Added a perf test to see the impact of using KahaDB replication.
- Other small fixes.
Added:
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=713089&r1=713088&r2=713089&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
Tue Nov 11 09:28:19 2008
@@ -422,7 +422,23 @@
}
public synchronized void appendedExternally(Location loc, int length)
throws IOException {
- DataFile dataFile = getDataFile(loc);
+ DataFile dataFile = null;
+ if( dataFiles.getTail().getDataFileId() == loc.getDataFileId()
) {
+ // It's an update to the current log file..
+ dataFile = dataFiles.getTail();
+ } else if( dataFiles.getTail().getDataFileId()+1 ==
loc.getDataFileId() ) {
+ // It's an update to the next log file.
+ int nextNum = loc.getDataFileId();
+ File file = getFile(nextNum);
+ dataFile = new DataFile(file, nextNum, preferedFileLength);
+ // actually allocate the disk space
+ fileMap.put(dataFile.getDataFileId(), dataFile);
+ fileByFileMap.put(file, dataFile);
+ dataFiles.addLast(dataFile);
+ } else {
+ throw new IOException("Invalid external append.");
+ }
+
dataFile.incrementLength(length);
}
@@ -433,6 +449,9 @@
if (cur == null) {
if (location == null) {
DataFile head = dataFiles.getHead();
+ if( head == null ) {
+ return null;
+ }
cur = new Location();
cur.setDataFileId(head.getDataFileId());
cur.setOffset(0);
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=713089&r1=713088&r2=713089&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
Tue Nov 11 09:28:19 2008
@@ -166,6 +166,7 @@
break;
}
} catch (Exception e) {
+ LOG.warn("Slave request failed: "+e, e);
failed(e);
}
}
@@ -286,6 +287,9 @@
}
private PBJournalLocation convert(Location loc) {
+ if( loc==null ) {
+ return null;
+ }
return new
PBJournalLocation().setFileId(loc.getDataFileId()).setOffset(loc.getOffset());
}
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=713089&r1=713088&r2=713089&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
(original)
+++
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
Tue Nov 11 09:28:19 2008
@@ -166,7 +166,7 @@
public void failed(Exception error) {
try {
- error.printStackTrace();
+ LOG.warn("Replication session fail to master:
"+transport.getRemoteAddress(), error);
stop();
} catch (Exception ignore) {
}
Added:
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java?rev=713089&view=auto
==============================================================================
---
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
(added)
+++
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/ReplicatedKahaStoreQueueTest.java
Tue Nov 11 09:28:19 2008
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store.perf;
+
+import java.util.Arrays;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.perf.SimpleQueueTest;
+import org.apache.kahadb.replication.ClusterState;
+import org.apache.kahadb.replication.ReplicatedBrokerService;
+import org.apache.kahadb.replication.StaticClusterStateManager;
+
+/**
+ * @version $Revision: 712224 $
+ */
+public class ReplicatedKahaStoreQueueTest extends SimpleQueueTest {
+
+ private StaticClusterStateManager cluster;
+ private ReplicatedBrokerService b1;
+ private ReplicatedBrokerService b2;
+
+ private static final String BROKER1_REPLICATION_ID =
"kdbr://localhost:60001";
+ private static final String BROKER2_REPLICATION_ID =
"kdbr://localhost:60002";
+
+ protected String broker2BindAddress="tcp://localhost:61617";
+
+ @Override
+ protected BrokerService createBroker(String uri) throws Exception {
+
+ clientURI="failover:(" +
+
"tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&wireFormat.maxInactivityDuration=50000"
+
+ "," +
+
"tcp://localhost:61617?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&wireFormat.maxInactivityDuration=50000"
+
+ ")?jms.useAsyncSend=true";
+
+ // This cluster object will control who becomes the master.
+ cluster = new StaticClusterStateManager();
+
+ ClusterState clusterState = new ClusterState();
+ clusterState.setMaster(BROKER1_REPLICATION_ID);
+ String[] slaves = {BROKER2_REPLICATION_ID};
+ clusterState.setSlaves(Arrays.asList(slaves));
+ cluster.setClusterState(clusterState);
+
+ b1 = new ReplicatedBrokerService();
+ b1.setDeleteAllMessagesOnStartup(true);
+ b1.addConnector(uri);
+ b1.setUseShutdownHook(false);
+
+ b1.setDataDirectory("target/test-amq-data/perfTest-b1/amqdb");
+ b1.setBrokerName("broker1");
+ b1.getReplicationServer().setNodeId(BROKER1_REPLICATION_ID);
+ b1.getReplicationServer().setCluster(cluster);
+ b1.start();
+
+ Thread.sleep(1000);
+
+ b2 = new ReplicatedBrokerService();
+ b2.addConnector(broker2BindAddress);
+ b2.setDataDirectory("target/test-amq-data/perfTest-b2/amqdb");
+ b2.setBrokerName("broker1");
+ b2.getReplicationServer().setNodeId(BROKER2_REPLICATION_ID);
+ b2.getReplicationServer().setCluster(cluster);
+ b2.start();
+
+
+ return b1;
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ if( b2!=null ) {
+ b2.stop();
+ b2 = null;
+ }
+ }
+
+}
+