Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=712217&r1=712216&r2=712217&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Fri Nov 7 10:16:10 2008 @@ -28,9 +28,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.TreeMap; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; @@ -40,20 +38,19 @@ import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.util.Callback; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.kahadb.LongMarshaller; import org.apache.kahadb.Marshaller; import org.apache.kahadb.StringMarshaller; import org.apache.kahadb.index.BTreeIndex; -import org.apache.kahadb.index.BTreeNode; import org.apache.kahadb.index.BTreeVisitor; import org.apache.kahadb.journal.Journal; import org.apache.kahadb.journal.Location; import org.apache.kahadb.page.Page; import org.apache.kahadb.page.PageFile; import org.apache.kahadb.page.Transaction; -import org.apache.kahadb.page.Transaction.PageOverflowIOException; import org.apache.kahadb.store.data.KahaAddMessageCommand; import org.apache.kahadb.store.data.KahaCommitCommand; import org.apache.kahadb.store.data.KahaDestination; @@ -70,8 +67,6 @@ import org.apache.kahadb.util.ByteSequence; import org.apache.kahadb.util.DataByteArrayInputStream; import org.apache.kahadb.util.DataByteArrayOutputStream; -import org.apache.kahadb.util.Sequence; -import org.apache.kahadb.util.SequenceSet; public class MessageDatabase { @@ -140,7 +135,7 @@ } protected PageFile pageFile; - protected Journal asyncDataManager; + protected Journal journal; protected Metadata metadata = new Metadata(); protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); @@ -175,15 +170,15 @@ public void load() throws IOException { recovering=true; - if (asyncDataManager == null) { - asyncDataManager = createAsyncDataManager(); - } + + // Creates the journal if it does not yet exist. + getJournal(); if (failIfJournalIsLocked) { - asyncDataManager.lock(); + journal.lock(); } else { while (true) { try { - asyncDataManager.lock(); + journal.lock(); break; } catch (IOException e) { LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked."); @@ -194,16 +189,14 @@ } } } - if (pageFile == null) { - pageFile = createPageFile(); - } + + // Creates the page file if it does not yet exist. + getPageFile(); - asyncDataManager.start(); + journal.start(); if (deleteAllMessages) { pageFile.delete(); - asyncDataManager.delete(); - - store(new KahaTraceCommand().setMessage("DELETED " + new Date())); + journal.delete(); LOG.info("Persistence store purged."); deleteAllMessages = false; @@ -269,11 +262,11 @@ Thread.sleep(sleepTime); long now = System.currentTimeMillis(); if( now - lastCleanup >= cleanupInterval ) { - checkpoint(true); + checkpointCleanup(true); lastCleanup = now; lastCheckpoint = now; } else if( now - lastCheckpoint >= checkpointInterval ) { - checkpoint(false); + checkpointCleanup(false); lastCheckpoint = now; } } @@ -305,7 +298,7 @@ metadata = new Metadata(); } store(new KahaTraceCommand().setMessage("CLEAN SHUTDOWN " + new Date())); - asyncDataManager.close(); + journal.close(); } /** @@ -347,7 +340,7 @@ // Perhaps there were no transactions... if( pos==null && metadata.lastUpdate!=null) { // Start replay at the record after the last one recorded in the index file. - pos = asyncDataManager.getNextLocation(metadata.lastUpdate); + pos = journal.getNextLocation(metadata.lastUpdate); // No journal records need to be recovered. if( pos == null ) { return; @@ -357,17 +350,17 @@ // Do we need to start from the begining? if (pos == null) { // This loads the first position. - pos = asyncDataManager.getNextLocation(null); + pos = journal.getNextLocation(null); } int redoCounter = 0; - LOG.info("Journal Recovery Started from: " + asyncDataManager + " at " + pos.getDataFileId() + ":" + pos.getOffset()); + LOG.info("Journal Recovery Started from: " + journal + " at " + pos.getDataFileId() + ":" + pos.getOffset()); while (pos != null) { JournalCommand message = load(pos); process(message, pos); redoCounter++; - pos = asyncDataManager.getNextLocation(pos); + pos = journal.getNextLocation(pos); } Location location = store(new KahaTraceCommand().setMessage("RECOVERED " + new Date()), true); @@ -375,7 +368,7 @@ LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds."); } - private void checkpoint(final boolean cleanup) { + protected void checkpointCleanup(final boolean cleanup) { try { synchronized (indexMutex) { pageFile.tx().execute(new Transaction.Closure<IOException>() { @@ -390,6 +383,23 @@ } } + + public void checkpoint(Callback closure) throws Exception { + try { + synchronized (indexMutex) { + pageFile.tx().execute(new Transaction.Closure<IOException>() { + public void execute(Transaction tx) throws IOException { + checkpointUpdate(tx, false); + } + }); + pageFile.flush(); + closure.execute(); + } + store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), true); + } catch (IOException e) { + } + } + // ///////////////////////////////////////////////////////////////// // Methods call by the broker to update and query the store. // ///////////////////////////////////////////////////////////////// @@ -408,7 +418,7 @@ DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); os.writeByte(data.type().getNumber()); data.writeFramed(os); - Location location = asyncDataManager.write(os.toByteSequence(), sync); + Location location = journal.write(os.toByteSequence(), sync); process(data, location); if( !recovering ) { metadata.lastUpdate = location; @@ -424,7 +434,7 @@ * @throws IOException */ public JournalCommand load(Location location) throws IOException { - ByteSequence data = asyncDataManager.read(location); + ByteSequence data = journal.read(location); DataByteArrayInputStream is = new DataByteArrayInputStream(data); KahaEntryType type = KahaEntryType.valueOf(is.readByte()); JournalCommand message = (JournalCommand)type.createMessage(); @@ -742,7 +752,7 @@ } LOG.debug("In use files: "+inUseFiles+", lastUpdate: "+l); - asyncDataManager.consolidateDataFilesNotIn(inUseFiles, l==null?null:l.getDataFileId()); + journal.consolidateDataFilesNotIn(inUseFiles, l==null?null:l.getDataFileId()); } LOG.debug("Checkpoint done."); @@ -1110,7 +1120,7 @@ return pf; } - private Journal createAsyncDataManager() { + private Journal createJournal() { Journal manager = new Journal(); manager.setDirectory(new File(directory, "journal")); manager.setMaxFileLength(1024 * 1024 * 20); @@ -1158,4 +1168,17 @@ this.cleanupInterval = cleanupInterval; } + public PageFile getPageFile() { + if (pageFile == null) { + pageFile = createPageFile(); + } + return pageFile; + } + + public Journal getJournal() { + if (journal == null) { + journal = createJournal(); + } + return journal; + } }
Added: activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto?rev=712217&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto (added) +++ activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto Fri Nov 7 10:16:10 2008 @@ -0,0 +1,99 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.apache.kahadb.replication.pb; + +option java_multiple_files = true; +option java_outer_classname = "PB"; + +// +// +// +message PBHeader { + required PBType type=1; + optional int64 payload_size=2; +} + +enum PBType { + + // Sent from the slave to the master when the slave first starts. It lets the master + // know about the slave's synchronization state. This allows the master decide how to best synchronize + // the slave. + // + // @followed-by PBSlaveInit + SLAVE_INIT = 0; + + // The Master will send this response back to the slave, letting it know what it needs to do to get + // it's data files synchronized with the master. + // + // @followed-by PBSlaveInitResponse + SLAVE_INIT_RESPONSE = 1; + + // Sent from the Master to the slave to replicate a Journal update. + // + // @followed-by PBJournalUpdate + JOURNAL_UPDATE=3; + + // An ack sent from the Slave to a master to let the master know up to where in the journal the slave has + // synchronized to. This acknowledges receipt of all previous journal records. This should not be sent until + // all bulk file copies are complete. + // + // @followed-by PBJournalLocation + JOURNAL_UPDATE_ACK=4; + + // A Request for a bulk file transfer. Sent from a slave to a Master + // + // @followed-by PBFileInfo + FILE_TRANSFER=5; + + // A bulk file transfer response + // + // @followed-by the bytes of the requested file. + FILE_TRANSFER_RESPONSE=6; +} + +message PBFileInfo { + required string name=1; + optional int32 snapshot_id=2; + optional sfixed64 checksum=3; + optional int64 start=4; + optional int64 end=5; +} + +message PBJournalLocation { + required int32 file_id=1; + required int32 offset=2; +} + +message PBSlaveInit { + // The id of the slave node that is being initialized + required string node_id=1; + // The files that the slave node currently has + repeated PBFileInfo current_files=2; +} + +message PBSlaveInitResponse { + // The files that the slave should bulk copy from the master.. + repeated PBFileInfo copy_files=1; + // The files that the slave should delete + repeated string delete_files=2; +} + +message PBJournalUpdate { + required PBJournalLocation location=1; + required bytes data=2; +} + Added: activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr?rev=712217&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr (added) +++ activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr Fri Nov 7 10:16:10 2008 @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.kahadb.replication.transport.KDBRTransportFactory Added: activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr?rev=712217&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr (added) +++ activemq/sandbox/kahadb/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr Fri Nov 7 10:16:10 2008 @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.kahadb.replication.transport.KDBRWireFormatFactory Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=712217&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java (added) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java Fri Nov 7 10:16:10 2008 @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.replication; + +import java.util.Arrays; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQQueue; + +public class ReplicationTest extends TestCase { + + + private static final String BROKER1_URI = "tcp://localhost:61001"; + private static final String BROKER2_URI = "tcp://localhost:61002"; + + private static final String BROKER1_REPLICATION_ID = "kdbr://localhost:60001"; + private static final String BROKER2_REPLICATION_ID = "kdbr://localhost:60002"; + + private Destination destination = new ActiveMQQueue("TEST_QUEUE"); + + public void testReplication() throws Exception { + + // This cluster object will control who becomes the master. + StaticClusterStateManager cluster = new StaticClusterStateManager(); + + ReplicatedBrokerService b1 = new ReplicatedBrokerService(); + b1.addConnector(BROKER1_URI); + b1.setDataDirectory("target/replication-test/broker1"); + b1.setBrokerName("broker1"); + b1.getReplicationServer().setNodeId(BROKER1_REPLICATION_ID); + b1.getReplicationServer().setCluster(cluster); + b1.start(); + + ReplicatedBrokerService b2 = new ReplicatedBrokerService(); + b2.addConnector(BROKER2_URI); + b2.setDataDirectory("target/replication-test/broker2"); + b2.setBrokerName("broker2"); + b2.getReplicationServer().setNodeId(BROKER2_REPLICATION_ID); + b2.getReplicationServer().setCluster(cluster); + b2.start(); + +// // None of the brokers should be accepting connections since they are not masters. +// try { +// sendMesagesTo(1, BROKER1_URI); +// fail("Connection failure expected."); +// } catch( JMSException e ) { +// } + + // Make b1 the master. + ClusterState clusterState = new ClusterState(); + clusterState.setMaster(BROKER1_REPLICATION_ID); + cluster.setClusterState(clusterState); + + try { + sendMesagesTo(500, BROKER1_URI); + } catch( JMSException e ) { + fail("b1 did not become a master."); + } + + // Make broker 2 a salve. + clusterState = new ClusterState(); + clusterState.setMaster(BROKER1_REPLICATION_ID); + String[] slaves = {BROKER2_REPLICATION_ID}; + clusterState.setSlaves(Arrays.asList(slaves)); + cluster.setClusterState(clusterState); + + Thread.sleep(10000); + + b2.stop(); + b1.stop(); + + } + + private void sendMesagesTo(int count, String brokerUri) throws JMSException { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri); + Connection con = cf.createConnection(); + try { + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < count; i++) { + producer.send(session.createTextMessage("Hello: "+i)); + } + } finally { + try { con.close(); } catch (Throwable e) {} + } + } + +} Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java?rev=712217&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java (added) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java Fri Nov 7 10:16:10 2008 @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.replication.transport; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; + +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportAcceptListener; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.transport.TransportServer; +import org.apache.kahadb.replication.ReplicationFrame; +import org.apache.kahadb.replication.pb.PBHeader; +import org.apache.kahadb.replication.pb.PBJournalLocation; +import org.apache.kahadb.replication.pb.PBSlaveInit; +import org.apache.kahadb.replication.pb.PBType; + +public class KDBRTransportTest extends TestCase { + + private static final String KDBR_URI = "kdbr://localhost:61618"; + private List<Object> serverQueue; + private List<Object> clientQueue; + private List<Transport> serverTransports; + private TransportServer server; + private Transport client; + + private Object commandLatchMutex = new Object(); + private CountDownLatch commandLatch; + + protected void releaseCommandLatch() { + synchronized( commandLatchMutex ) { + if( commandLatch == null ) { + return; + } + commandLatch.countDown(); + commandLatch=null; + } + } + + protected CountDownLatch getCommandLatch() { + synchronized( commandLatchMutex ) { + if( commandLatch == null ) { + commandLatch = new CountDownLatch(1); + } + return commandLatch; + } + } + + @Override + protected void setUp() throws Exception { + serverQueue = Collections.synchronizedList(new ArrayList<Object>()); + clientQueue = Collections.synchronizedList(new ArrayList<Object>()); + serverTransports = Collections.synchronizedList(new ArrayList<Transport>()); + + // Setup a server + server = TransportFactory.bind(new URI(KDBR_URI)); + server.setAcceptListener(new TransportAcceptListener() { + public void onAccept(Transport transport) { + try { + transport.setTransportListener(new TransportListener() { + public void onCommand(Object command) { + try { + serverQueue.add(command); + process(command); + releaseCommandLatch(); + } catch (IOException e) { + onException(e); + } + } + + public void onException(IOException error) { + serverQueue.add(error); + serverTransports.remove(this); + releaseCommandLatch(); + } + + public void transportInterupted() { + } + + public void transportResumed() { + } + }); + transport.start(); + serverTransports.add(transport); + } catch (Exception e) { + onAcceptError(e); + } + } + + public void onAcceptError(Exception error) { + error.printStackTrace(); + } + }); + server.start(); + + // Connect a client. + client = TransportFactory.connect(new URI(KDBR_URI)); + client.setTransportListener(new TransportListener() { + public void onCommand(Object command) { + clientQueue.add(command); + releaseCommandLatch(); + } + + public void onException(IOException error) { + clientQueue.add(error); + releaseCommandLatch(); + } + + public void transportInterupted() { + } + + public void transportResumed() { + } + }); + client.start(); + } + + @Override + protected void tearDown() throws Exception { + client.stop(); + server.stop(); + } + + private void process(Object command) throws IOException { + ReplicationFrame frame = (ReplicationFrame) command; + // Since we are processing the commands async in this test case we need to full read the stream before + // returning since will be be used to read the next command once we return. + if( frame.getHeader().getType() == PBType.FILE_TRANSFER_RESPONSE ) { + InputStream ais = (InputStream) frame.getPayload(); + byte actualPayload[] = new byte[(int)frame.getHeader().getPayloadSize()]; + readFully(ais, actualPayload); + frame.setPayload(actualPayload); + } + } + + /** + * Test a frame that has a streaming payload. + * + * @throws Exception + */ + public void testFileTransferResponse() throws Exception { + + byte expectedPayload[] = {1,2,3,4,5,6,7,8,9,10}; + + ReplicationFrame expected = new ReplicationFrame(); + expected.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(expectedPayload.length)); + ByteArrayInputStream is = new ByteArrayInputStream(expectedPayload); + expected.setPayload(is); + + CountDownLatch latch = getCommandLatch(); + client.oneway(expected); + is.close(); + latch.await(2, TimeUnit.SECONDS); + + assertEquals(1, serverQueue.size()); + ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0); + + assertEquals(expected.getHeader(), actual.getHeader()); + assertTrue(Arrays.equals(expectedPayload, (byte[])actual.getPayload())); + + } + + + /** + * Test out sending a frame that has a PB payload. + * + * @throws Exception + */ + public void testPBSlaveInitFrame() throws Exception { + + + ReplicationFrame expected = new ReplicationFrame(); + expected.setHeader(new PBHeader().setType(PBType.SLAVE_INIT)); + expected.setPayload(new PBSlaveInit().setNodeId("foo")); + + CountDownLatch latch = getCommandLatch(); + client.oneway(expected); + latch.await(2, TimeUnit.SECONDS); + + assertEquals(1, serverQueue.size()); + ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0); + + assertEquals(expected.getHeader(), actual.getHeader()); + assertEquals(expected.getPayload(), actual.getPayload()); + + } + + + private void readFully(InputStream ais, byte[] actualPayload) throws IOException { + int pos = 0; + int c; + while( pos < actualPayload.length && (c=ais.read(actualPayload, pos, actualPayload.length-pos))>=0 ) { + pos += c; + } + if( pos < actualPayload.length ) { + throw new EOFException(); + } + } +} Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreBrokerTest.java?rev=712217&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreBrokerTest.java (added) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreBrokerTest.java Fri Nov 7 10:16:10 2008 @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.store; + +import java.io.File; +import java.net.URI; + +import junit.framework.Test; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerTest; + +/** + * Once the wire format is completed we can test against real persistence storage. + * + * @version $Revision$ + */ +public class KahaDBStoreBrokerTest extends BrokerTest { + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File("target/activemq-data/kahadb")); + kaha.deleteAllMessages(); + broker.setPersistenceAdapter(kaha); + return broker; + } + + protected BrokerService createRestartedBroker() throws Exception { + BrokerService broker = new BrokerService(); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File("target/activemq-data/kahadb")); + broker.setPersistenceAdapter(kaha); + return broker; + } + + + public static Test suite() { + return suite(KahaDBStoreBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java?rev=712217&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java (added) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java Fri Nov 7 10:16:10 2008 @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.store; + +import java.io.File; +import java.net.URI; +import java.util.ArrayList; + +import junit.framework.Test; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.RecoveryBrokerTest; +import org.apache.activemq.broker.StubConnection; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; + + +/** + * Used to verify that recovery works correctly against + * + * @version $Revision$ + */ +public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest { + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File("target/activemq-data/kahadb")); + kaha.deleteAllMessages(); + broker.setPersistenceAdapter(kaha); + return broker; + } + + protected BrokerService createRestartedBroker() throws Exception { + BrokerService broker = new BrokerService(); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File("target/activemq-data/kahadb")); + broker.setPersistenceAdapter(kaha); + return broker; + } + + public static Test suite() { + return suite(KahaDBStoreRecoveryBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + + public void testLargeQueuePersistentMessagesNotLostOnRestart() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + ArrayList<String> expected = new ArrayList<String>(); + + int MESSAGE_COUNT = 10000; + for(int i=0; i < MESSAGE_COUNT; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + expected.add(message.getMessageId().toString()); + } + connection.request(closeConnectionInfo(connectionInfo)); + + // restart the broker. + restartBroker(); + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + producerInfo = createProducerInfo(sessionInfo); + connection.send(producerInfo); + + for(int i=0; i < MESSAGE_COUNT/2; i++) { + Message m = receiveMessage(connection); + assertNotNull("Should have received message "+expected.get(0)+" by now!", m); + assertEquals(expected.remove(0), m.getMessageId().toString()); + MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE); + connection.send(ack); + } + + connection.request(closeConnectionInfo(connectionInfo)); + + // restart the broker. + restartBroker(); + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + for(int i=0; i < MESSAGE_COUNT/2; i++) { + Message m = receiveMessage(connection); + assertNotNull("Should have received message "+expected.get(i)+" by now!", m); + assertEquals(expected.get(i), m.getMessageId().toString()); + MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE); + connection.send(ack); + + + } + + connection.request(closeConnectionInfo(connectionInfo)); + } +} Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreXARecoveryBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreXARecoveryBrokerTest.java?rev=712217&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreXARecoveryBrokerTest.java (added) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreXARecoveryBrokerTest.java Fri Nov 7 10:16:10 2008 @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.store; + +import java.io.File; +import java.net.URI; + +import junit.framework.Test; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.XARecoveryBrokerTest; + +/** + * Used to verify that recovery works correctly against + * + * @version $Revision$ + */ +public class KahaDBStoreXARecoveryBrokerTest extends XARecoveryBrokerTest { + + public static Test suite() { + return suite(KahaDBStoreXARecoveryBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File("target/activemq-data/kahadb")); + kaha.deleteAllMessages(); + broker.setPersistenceAdapter(kaha); + return broker; + } + + protected BrokerService createRestartedBroker() throws Exception { + BrokerService broker = new BrokerService(); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File("target/activemq-data/kahadb")); + broker.setPersistenceAdapter(kaha); + return broker; + } + +} Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java?rev=712217&r1=712216&r2=712217&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java (original) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java Fri Nov 7 10:16:10 2008 @@ -41,7 +41,7 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.kahadb.store.KahaDBPersistenceAdaptor; +import org.apache.kahadb.store.KahaDBStore; /** * This tests bulk loading and unloading of messages to a Queue.s @@ -56,7 +56,7 @@ protected BrokerService createBroker() throws Exception { BrokerService broker = new BrokerService(); - KahaDBPersistenceAdaptor kaha = new KahaDBPersistenceAdaptor(); + KahaDBStore kaha = new KahaDBStore(); kaha.setDirectory(new File("target/activemq-data/kahadb")); // kaha.deleteAllMessages(); broker.setPersistenceAdapter(kaha); Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java?rev=712217&r1=712216&r2=712217&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java (original) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java Fri Nov 7 10:16:10 2008 @@ -19,7 +19,7 @@ import java.io.File; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.perf.SimpleDurableTopicTest; -import org.apache.kahadb.store.KahaDBPersistenceAdaptor; +import org.apache.kahadb.store.KahaDBStore; /** * @version $Revision: 1.3 $ @@ -31,7 +31,7 @@ dataFileDir.mkdirs(); // answer.setDeleteAllMessagesOnStartup(true); - KahaDBPersistenceAdaptor adaptor = new KahaDBPersistenceAdaptor(); + KahaDBStore adaptor = new KahaDBStore(); adaptor.setDirectory(dataFileDir); Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java?rev=712217&r1=712216&r2=712217&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java (original) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreQueueTest.java Fri Nov 7 10:16:10 2008 @@ -19,7 +19,7 @@ import java.io.File; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.perf.SimpleQueueTest; -import org.apache.kahadb.store.KahaDBPersistenceAdaptor; +import org.apache.kahadb.store.KahaDBStore; /** * @version $Revision: 1.3 $ @@ -31,7 +31,7 @@ dataFileDir.mkdirs(); answer.setDeleteAllMessagesOnStartup(true); - KahaDBPersistenceAdaptor adaptor = new KahaDBPersistenceAdaptor(); + KahaDBStore adaptor = new KahaDBStore(); adaptor.setDirectory(dataFileDir);
