Author: chirino
Date: Mon Sep 22 13:17:59 2008
New Revision: 697971
URL: http://svn.apache.org/viewvc?rev=697971&view=rev
Log:
adding a test to verify that the store works well even when loaded with lots of
messages.
Modified:
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java
Modified:
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java?rev=697971&r1=697970&r2=697971&view=diff
==============================================================================
---
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java
(original)
+++
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaStoreRecoveryBrokerTest.java
Mon Sep 22 13:17:59 2008
@@ -18,12 +18,22 @@
import java.io.File;
import java.net.URI;
+import java.util.ArrayList;
import junit.framework.Test;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.RecoveryBrokerTest;
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
/**
@@ -58,4 +68,77 @@
junit.textui.TestRunner.run(suite());
}
+
+ public void testLargeQueuePersistentMessagesNotLostOnRestart() throws
Exception {
+
+ ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+ // Setup the producer and send the message.
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ ArrayList<String> expected = new ArrayList<String>();
+
+ int MESSAGE_COUNT = 10000;
+ for(int i=0; i < MESSAGE_COUNT; i++) {
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(true);
+ connection.send(message);
+ expected.add(message.getMessageId().toString());
+ }
+ connection.request(closeConnectionInfo(connectionInfo));
+
+ // restart the broker.
+ restartBroker();
+
+ // Setup the consumer and receive the message.
+ connection = createConnection();
+ connectionInfo = createConnectionInfo();
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo,
destination);
+ connection.send(consumerInfo);
+ producerInfo = createProducerInfo(sessionInfo);
+ connection.send(producerInfo);
+
+ for(int i=0; i < MESSAGE_COUNT/2; i++) {
+ Message m = receiveMessage(connection);
+ assertNotNull("Should have received message "+expected.get(0)+" by
now!", m);
+ assertEquals(expected.remove(0), m.getMessageId().toString());
+ MessageAck ack = createAck(consumerInfo, m, 1,
MessageAck.STANDARD_ACK_TYPE);
+ connection.send(ack);
+ }
+
+ connection.request(closeConnectionInfo(connectionInfo));
+
+ // restart the broker.
+ restartBroker();
+
+ // Setup the consumer and receive the message.
+ connection = createConnection();
+ connectionInfo = createConnectionInfo();
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ consumerInfo = createConsumerInfo(sessionInfo, destination);
+ connection.send(consumerInfo);
+
+ for(int i=0; i < MESSAGE_COUNT/2; i++) {
+ Message m = receiveMessage(connection);
+ assertNotNull("Should have received message "+expected.get(i)+" by
now!", m);
+ assertEquals(expected.get(i), m.getMessageId().toString());
+ MessageAck ack = createAck(consumerInfo, m, 1,
MessageAck.STANDARD_ACK_TYPE);
+ connection.send(ack);
+
+
+ }
+
+ connection.request(closeConnectionInfo(connectionInfo));
+ }
}