Added: activemq/trunk/activemq-core/src/main/proto/journal-data.proto URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/proto/journal-data.proto?rev=731704&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/proto/journal-data.proto (added) +++ activemq/trunk/activemq-core/src/main/proto/journal-data.proto Mon Jan 5 12:48:38 2009 @@ -0,0 +1,149 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.apache.activemq.store.kahadb.data; + +option java_multiple_files = true; +option java_outer_classname = "JournalData"; + +enum KahaEntryType { + //| option java_create_message="true"; + KAHA_TRACE_COMMAND = 0; + KAHA_ADD_MESSAGE_COMMAND = 1; + KAHA_REMOVE_MESSAGE_COMMAND = 2; + KAHA_PREPARE_COMMAND = 3; + KAHA_COMMIT_COMMAND = 4; + KAHA_ROLLBACK_COMMAND = 5; + KAHA_REMOVE_DESTINATION_COMMAND = 6; + KAHA_SUBSCRIPTION_COMMAND = 7; +} + +message KahaTraceCommand { + // We make use of the wonky comment style bellow because the following options + // are not valid for protoc, but they are valid for the ActiveMQ proto compiler. + // In the ActiveMQ proto compiler, comments terminate with the pipe character: | + + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaTraceCommand>"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + + required string message = 1; +} + +message KahaAddMessageCommand { + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaAddMessageCommand>"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + + optional KahaTransactionInfo transaction_info=1; + required KahaDestination destination = 2; + required string messageId = 3; + required bytes message = 4; +} + +message KahaRemoveMessageCommand { + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRemoveMessageCommand>"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + + optional KahaTransactionInfo transaction_info=1; + required KahaDestination destination = 2; + required string messageId = 3; + optional bytes ack = 4; + optional string subscriptionKey = 5; // Set if it is a topic ack. +} + +message KahaPrepareCommand { + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaPrepareCommand>"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + + required KahaTransactionInfo transaction_info=1; +} + +message KahaCommitCommand { + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaCommitCommand>"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + + required KahaTransactionInfo transaction_info=1; +} + +message KahaRollbackCommand { + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRollbackCommand>"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + + required KahaTransactionInfo transaction_info=1; +} + +message KahaRemoveDestinationCommand { + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRemoveDestinationCommand>"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + + required KahaDestination destination = 1; +} + +message KahaSubscriptionCommand { + //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaSubscriptionCommand>"; + //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException"; + //| option java_type_method = "KahaEntryType"; + + required KahaDestination destination = 1; + required string subscriptionKey = 2; + optional bool retroactive = 3; + optional bytes subscriptionInfo = 4; +} + +message KahaDestination { + enum DestinationType { + QUEUE = 0; + TOPIC = 1; + TEMP_QUEUE = 2; + TEMP_TOPIC = 3; + } + + required DestinationType type = 1 [default = QUEUE]; + required string name = 2; +} + +message KahaTransactionInfo { + optional KahaLocalTransactionId local_transaciton_id=1; + optional KahaXATransactionId xa_transaciton_id=2; + optional KahaLocation previous_entry=3; +} + +message KahaLocalTransactionId { + required string connection_id=1; + required int64 transaciton_id=1; +} + +message KahaXATransactionId { + required int32 format_id = 1; + required bytes branch_qualifier = 2; + required bytes global_transaction_id = 3; +} + +message KahaLocation { + required int32 log_id = 1; + required int32 offset = 2; +} + +// TODO things to ponder +// should we move more message fields +// that are set by the sender (and rarely required by the broker +// into the Properties object?
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreBrokerTest.java?rev=731704&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreBrokerTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreBrokerTest.java Mon Jan 5 12:48:38 2009 @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.File; +import java.net.URI; + +import junit.framework.Test; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerTest; + +/** + * Once the wire format is completed we can test against real persistence storage. + * + * @version $Revision: 712224 $ + */ +public class KahaDBStoreBrokerTest extends BrokerTest { + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File("target/activemq-data/kahadb")); + kaha.deleteAllMessages(); + broker.setPersistenceAdapter(kaha); + return broker; + } + + protected BrokerService createRestartedBroker() throws Exception { + BrokerService broker = new BrokerService(); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File("target/activemq-data/kahadb")); + broker.setPersistenceAdapter(kaha); + return broker; + } + + + public static Test suite() { + return suite(KahaDBStoreBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java?rev=731704&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java Mon Jan 5 12:48:38 2009 @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.File; +import java.net.URI; +import java.util.ArrayList; + +import junit.framework.Test; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.RecoveryBrokerTest; +import org.apache.activemq.broker.StubConnection; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; + + +/** + * Used to verify that recovery works correctly against + * + * @version $Revision: 712224 $ + */ +public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest { + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File("target/activemq-data/kahadb")); + kaha.deleteAllMessages(); + broker.setPersistenceAdapter(kaha); + return broker; + } + + protected BrokerService createRestartedBroker() throws Exception { + BrokerService broker = new BrokerService(); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File("target/activemq-data/kahadb")); + broker.setPersistenceAdapter(kaha); + return broker; + } + + public static Test suite() { + return suite(KahaDBStoreRecoveryBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + + public void testLargeQueuePersistentMessagesNotLostOnRestart() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + ArrayList<String> expected = new ArrayList<String>(); + + int MESSAGE_COUNT = 10000; + for(int i=0; i < MESSAGE_COUNT; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + expected.add(message.getMessageId().toString()); + } + connection.request(closeConnectionInfo(connectionInfo)); + + // restart the broker. + restartBroker(); + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + producerInfo = createProducerInfo(sessionInfo); + connection.send(producerInfo); + + for(int i=0; i < MESSAGE_COUNT/2; i++) { + Message m = receiveMessage(connection); + assertNotNull("Should have received message "+expected.get(0)+" by now!", m); + assertEquals(expected.remove(0), m.getMessageId().toString()); + MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE); + connection.send(ack); + } + + connection.request(closeConnectionInfo(connectionInfo)); + + // restart the broker. + restartBroker(); + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + for(int i=0; i < MESSAGE_COUNT/2; i++) { + Message m = receiveMessage(connection); + assertNotNull("Should have received message "+expected.get(i)+" by now!", m); + assertEquals(expected.get(i), m.getMessageId().toString()); + MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE); + connection.send(ack); + + + } + + connection.request(closeConnectionInfo(connectionInfo)); + } +} Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreXARecoveryBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreXARecoveryBrokerTest.java?rev=731704&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreXARecoveryBrokerTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreXARecoveryBrokerTest.java Mon Jan 5 12:48:38 2009 @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.File; +import java.net.URI; + +import junit.framework.Test; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.XARecoveryBrokerTest; + +/** + * Used to verify that recovery works correctly against + * + * @version $Revision: 712224 $ + */ +public class KahaDBStoreXARecoveryBrokerTest extends XARecoveryBrokerTest { + + public static Test suite() { + return suite(KahaDBStoreXARecoveryBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File("target/activemq-data/kahadb")); + kaha.deleteAllMessages(); + broker.setPersistenceAdapter(kaha); + return broker; + } + + protected BrokerService createRestartedBroker() throws Exception { + BrokerService broker = new BrokerService(); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File("target/activemq-data/kahadb")); + broker.setPersistenceAdapter(kaha); + return broker; + } + +} Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java?rev=731704&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java Mon Jan 5 12:48:38 2009 @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.IOException; + +import junit.framework.TestCase; + +import org.apache.activemq.protobuf.Buffer; +import org.apache.kahadb.journal.Location; +import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; +import org.apache.activemq.store.kahadb.data.KahaDestination; +import org.apache.activemq.store.kahadb.data.KahaEntryType; +import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; +import org.apache.kahadb.util.ByteSequence; +import org.apache.kahadb.util.DataByteArrayInputStream; +import org.apache.kahadb.util.DataByteArrayOutputStream; + +public class PBMesssagesTest extends TestCase { + + public void testKahaAddMessageCommand() throws IOException { + + KahaAddMessageCommand expected = new KahaAddMessageCommand(); + expected.setDestination(new KahaDestination().setName("Foo").setType(DestinationType.QUEUE)); + expected.setMessage(new Buffer(new byte[] {1,2,3,4,5,6} )); + expected.setMessageId("Hello World"); + + int size = expected.serializedSizeFramed(); + DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); + os.writeByte(expected.type().getNumber()); + expected.writeFramed(os); + ByteSequence seq = os.toByteSequence(); + + DataByteArrayInputStream is = new DataByteArrayInputStream(seq); + KahaEntryType type = KahaEntryType.valueOf(is.readByte()); + JournalCommand message = (JournalCommand)type.createMessage(); + message.mergeFramed(is); + + assertEquals(expected, message); + } + +} Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.java?rev=731704&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.java Mon Jan 5 12:48:38 2009 @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb.perf; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.BytesMessage; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.Test; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.JmsTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ProgressPrinter; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.activemq.store.kahadb.KahaDBStore; + +/** + * This tests bulk loading and unloading of messages to a Queue.s + * + * @version $Revision: 712224 $ + */ +public class KahaBulkLoadingTest extends JmsTestSupport { + + private static final Log LOG = LogFactory.getLog(KahaBulkLoadingTest.class); + + protected int messageSize = 1024 * 4; + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File("target/activemq-data/kahadb")); + // kaha.deleteAllMessages(); + broker.setPersistenceAdapter(kaha); + broker.addConnector("tcp://localhost:0"); + return broker; + } + + protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(((TransportConnector)broker.getTransportConnectors().get(0)).getServer().getConnectURI()); + factory.setUseAsyncSend(true); + return factory; + } + + public void testQueueSendThenAddConsumer() throws Exception { + long start; + long end; + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + connection.setUseCompression(false); + connection.getPrefetchPolicy().setAll(10); + connection.start(); + + Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + + LOG.info("Receiving messages that are in the queue"); + MessageConsumer consumer = session.createConsumer(destination); + BytesMessage msg = (BytesMessage)consumer.receive(2000); + int consumed = 0; + if( msg!=null ) { + consumed++; + } + while (true) { + int counter = 0; + if (msg == null) { + break; + } + end = start = System.currentTimeMillis(); + int size = 0; + while ((end - start) < 5000) { + msg = (BytesMessage)consumer.receive(5000); + if (msg == null) { + break; + } + counter++; + consumed++; + end = System.currentTimeMillis(); + size += msg.getBodyLength(); + } + LOG.info("Consumed: " + (counter * 1000.0 / (end - start)) + " " + " messages/sec, " + (1.0 * size / (1024.0 * 1024.0)) * ((1000.0 / (end - start))) + " megs/sec "); + } + consumer.close(); + LOG.info("Consumed " + consumed + " messages from the queue."); + + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + LOG.info("Sending messages that are " + (messageSize / 1024.0) + "k large"); + // Send a message to the broker. + start = System.currentTimeMillis(); + + final AtomicBoolean stop = new AtomicBoolean(); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + stop.set(true); + } + }); + + int produced = 0; + while (!stop.get()) { + end = start = System.currentTimeMillis(); + int produceCount = 0; + while ((end - start) < 5000 && !stop.get()) { + BytesMessage bm = session.createBytesMessage(); + bm.writeBytes(new byte[messageSize]); + producer.send(bm); + produceCount++; + produced++; + end = System.currentTimeMillis(); + } + LOG.info("Produced: " + (produceCount * 1000.0 / (end - start)) + " messages/sec, " + (1.0 * produceCount * messageSize / (1024.0 * 1024.0)) * ((1000.0 / (end - start))) + " megs/sec"); + } + LOG.info("Prodcued " + produced + " messages to the queue."); + + } + + public static Test suite() { + return suite(KahaBulkLoadingTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreDurableTopicTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreDurableTopicTest.java?rev=731704&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreDurableTopicTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreDurableTopicTest.java Mon Jan 5 12:48:38 2009 @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb.perf; + +import java.io.File; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.perf.SimpleDurableTopicTest; +import org.apache.activemq.store.kahadb.KahaDBStore; + +/** + * @version $Revision: 712224 $ + */ +public class KahaStoreDurableTopicTest extends SimpleDurableTopicTest { + + protected void configureBroker(BrokerService answer,String uri) throws Exception { + File dataFileDir = new File("target/test-amq-data/perfTest/amqdb"); + dataFileDir.mkdirs(); + // answer.setDeleteAllMessagesOnStartup(true); + + KahaDBStore adaptor = new KahaDBStore(); + adaptor.setDirectory(dataFileDir); + + + answer.setDataDirectoryFile(dataFileDir); + answer.setPersistenceAdapter(adaptor); + answer.addConnector(uri); + } +} Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreQueueTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreQueueTest.java?rev=731704&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreQueueTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreQueueTest.java Mon Jan 5 12:48:38 2009 @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb.perf; + +import java.io.File; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.perf.SimpleQueueTest; +import org.apache.activemq.store.kahadb.KahaDBStore; + +/** + * @version $Revision: 712224 $ + */ +public class KahaStoreQueueTest extends SimpleQueueTest { + + protected void configureBroker(BrokerService answer,String uri) throws Exception { + File dataFileDir = new File("target/test-amq-data/perfTest/amqdb"); + dataFileDir.mkdirs(); + answer.setDeleteAllMessagesOnStartup(true); + + KahaDBStore adaptor = new KahaDBStore(); + adaptor.setDirectory(dataFileDir); + + + answer.setDataDirectoryFile(dataFileDir); + answer.setPersistenceAdapter(adaptor); + answer.addConnector(uri); + } + +} + Modified: activemq/trunk/kahadb/pom.xml URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/pom.xml?rev=731704&r1=731703&r2=731704&view=diff ============================================================================== --- activemq/trunk/kahadb/pom.xml (original) +++ activemq/trunk/kahadb/pom.xml Mon Jan 5 12:48:38 2009 @@ -52,78 +52,27 @@ </exclusion> </exclusions> </dependency> + <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.14</version> - <scope>compile</scope> + <scope>test</scope> <optional>true</optional> </dependency> <dependency> - <groupId>org.apache.activemq.protobuf</groupId> - <artifactId>activemq-protobuf</artifactId> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.xbean</groupId> - <artifactId>xbean-spring</artifactId> - <optional>true</optional> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-core</artifactId> - <optional>true</optional> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-beans</artifactId> - <optional>true</optional> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-context</artifactId> - <optional>true</optional> - </dependency> - <dependency> - <groupId>org.apache.hadoop.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <optional>true</optional> - </dependency> - - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-core</artifactId> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activeblaze</artifactId> - <version>1.0-SNAPSHOT</version> - <optional>true</optional> - </dependency> - + </dependencies> - - <repositories> - <repository> - <id>chirino-zk-repo</id> - <name>Private ZooKeeper Repo</name> - <url>http://people.apache.org/~chirino/zk-repo/</url> - </repository> - </repositories> + <build> <plugins> + <!-- <plugin> <groupId>org.apache.xbean</groupId> <artifactId>maven-xbean-plugin</artifactId> @@ -139,6 +88,7 @@ </execution> </executions> </plugin> + --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> @@ -148,17 +98,6 @@ </configuration> </plugin> <plugin> - <groupId>org.apache.activemq.protobuf</groupId> - <artifactId>activemq-protobuf</artifactId> - <executions> - <execution> - <goals> - <goal>compile</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> <artifactId>maven-surefire-plugin</artifactId> <configuration> <forkMode>pertest</forkMode>
