http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java new file mode 100644 index 0000000..491a585 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java @@ -0,0 +1,586 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.util.ArrayList; + +import javax.jms.DeliveryMode; + +import junit.framework.Test; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.LocalTransactionId; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.XATransactionId; + +/** + * Used to simulate the recovery that occurs when a broker shuts down. + * + * + */ +public class RecoveryBrokerTest extends BrokerRestartTestSupport { + + /** + * Used to verify that after a broker restart durable subscriptions that use + * wild cards are still wild card subscription after broker restart. + * + * @throws Exception + */ + //need to revist!!! + public void XtestWildCardSubscriptionPreservedOnRestart() throws Exception { + ActiveMQDestination dest1 = new ActiveMQTopic("TEST.A"); + ActiveMQDestination dest2 = new ActiveMQTopic("TEST.B"); + ActiveMQDestination dest3 = new ActiveMQTopic("TEST.C"); + ActiveMQDestination wildDest = new ActiveMQTopic("TEST.>"); + + ArrayList<MessageId> sentBeforeRestart = new ArrayList<MessageId>(); + ArrayList<MessageId> sentBeforeCreateConsumer = new ArrayList<MessageId>(); + ArrayList<MessageId> sentAfterCreateConsumer = new ArrayList<MessageId>(); + + // Setup a first connection + { + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + connectionInfo1.setClientId("A"); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.send(producerInfo1); + + // Create the durable subscription. + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, wildDest); + consumerInfo1.setSubscriptionName("test"); + consumerInfo1.setPrefetchSize(100); + connection1.send(consumerInfo1); + + // Close the subscription. + connection1.send(closeConsumerInfo(consumerInfo1)); + + // Send the messages + for (int i = 0; i < 4; i++) { + Message m = createMessage(producerInfo1, dest1, DeliveryMode.PERSISTENT); + connection1.send(m); + sentBeforeRestart.add(m.getMessageId()); + } + connection1.request(closeConnectionInfo(connectionInfo1)); + connection1.stop(); + } + + // Restart the broker. + restartBroker(); + + // Get a connection to the new broker. + { + StubConnection connection2 = createConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + connectionInfo2.setClientId("A"); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + + ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); + connection2.send(producerInfo2); + + // Send messages before the durable subscription is re-activated. + for (int i = 0; i < 4; i++) { + Message m = createMessage(producerInfo2, dest2, DeliveryMode.PERSISTENT); + connection2.send(m); + sentBeforeCreateConsumer.add(m.getMessageId()); + } + + // Re-open the subscription. + ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, wildDest); + consumerInfo2.setSubscriptionName("test"); + consumerInfo2.setPrefetchSize(100); + connection2.send(consumerInfo2); + + // Send messages after the subscription is activated. + for (int i = 0; i < 4; i++) { + Message m = createMessage(producerInfo2, dest3, DeliveryMode.PERSISTENT); + connection2.send(m); + sentAfterCreateConsumer.add(m.getMessageId()); + } + + // We should get the recovered messages... + for (int i = 0; i < 4; i++) { + Message m2 = receiveMessage(connection2); + assertNotNull("Recovered message missing: " + i, m2); + assertEquals(sentBeforeRestart.get(i), m2.getMessageId()); + } + + // We should get get the messages that were sent before the sub was + // reactivated. + for (int i = 0; i < 4; i++) { + Message m2 = receiveMessage(connection2); + assertNotNull("Before activated message missing: " + i, m2); + assertEquals(sentBeforeCreateConsumer.get(i), m2.getMessageId()); + } + + // We should get get the messages that were sent after the sub was + // reactivated. + for (int i = 0; i < 4; i++) { + Message m2 = receiveMessage(connection2); + assertNotNull("After activated message missing: " + i, m2); + assertEquals("" + i, sentAfterCreateConsumer.get(i), m2.getMessageId()); + } + + assertNoMessagesLeft(connection2); + } + + } + + public void testConsumedQueuePersistentMessagesLostOnRestart() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + for (int i = 0; i < 4; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // The we should get the messages. + for (int i = 0; i < 4; i++) { + Message m2 = receiveMessage(connection); + assertNotNull(m2); + } + + // restart the broker. + restartBroker(); + + // No messages should be delivered. + Message m = receiveMessage(connection); + assertNull(m); + } + + public void testQueuePersistentUncommitedMessagesLostOnRestart() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + // Begin the transaction. + LocalTransactionId txid = createLocalTransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + for (int i = 0; i < 4; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.send(message); + } + + // Don't commit + + // restart the broker. + restartBroker(); + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // No messages should be delivered. + Message m = receiveMessage(connection); + assertNull(m); + } + + public void testTopicDurableConsumerHoldsPersistentMessageAfterRestart() throws Exception { + + ActiveMQDestination destination = new ActiveMQTopic("TEST"); + + // Setup a first connection + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + connectionInfo1.setClientId("A"); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.send(producerInfo1); + + // Create the durable subscription. + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); + consumerInfo1.setSubscriptionName("test"); + consumerInfo1.setPrefetchSize(100); + connection1.send(consumerInfo1); + + // Close the subscription. + connection1.send(closeConsumerInfo(consumerInfo1)); + + // Send the messages + connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT)); + connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT)); + connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT)); + connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT)); + connection1.request(closeConnectionInfo(connectionInfo1)); + // Restart the broker. + restartBroker(); + + // Get a connection to the new broker. + StubConnection connection2 = createConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + connectionInfo2.setClientId("A"); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + + // Re-open the subscription. + ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); + consumerInfo2.setSubscriptionName("test"); + consumerInfo2.setPrefetchSize(100); + connection2.send(consumerInfo2); + + // The we should get the messages. + for (int i = 0; i < 4; i++) { + Message m2 = receiveMessage(connection2); + assertNotNull("Did not get message "+i, m2); + } + assertNoMessagesLeft(connection2); + } + + public void testQueuePersistentMessagesNotLostOnRestart() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + connection.request(closeConnectionInfo(connectionInfo)); + + // restart the broker. + restartBroker(); + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Message should have been dropped due to broker restart. + Message m = receiveMessage(connection); + assertNotNull("Should have received a message by now!", m); + assertEquals(m.getMessageId(), message.getMessageId()); + } + + public void testQueueNonPersistentMessagesLostOnRestart() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + Message message = createMessage(producerInfo, destination); + message.setPersistent(false); + connection.send(message); + + // restart the broker. + restartBroker(); + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Message should have been dropped due to broker restart. + assertNoMessagesLeft(connection); + } + + public void testQueuePersistentCommitedMessagesNotLostOnRestart() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + // Begin the transaction. + LocalTransactionId txid = createLocalTransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + for (int i = 0; i < 4; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.send(message); + } + + // Commit + connection.send(createCommitTransaction1Phase(connectionInfo, txid)); + connection.request(closeConnectionInfo(connectionInfo)); + // restart the broker. + restartBroker(); + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + for (int i = 0; i < 4; i++) { + Message m = receiveMessage(connection); + assertNotNull(m); + } + + assertNoMessagesLeft(connection); + } + + + public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + for (int i = 0; i < 4; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + // Setup the consumer and receive the message. + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Begin the transaction. + LocalTransactionId txid = createLocalTransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + for (int i = 0; i < 4; i++) { + Message m = receiveMessage(connection); + assertNotNull(m); + MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + } + // Commit + connection.send(createCommitTransaction1Phase(connectionInfo, txid)); + connection.request(closeConnectionInfo(connectionInfo)); + // restart the broker. + restartBroker(); + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // No messages should be delivered. + Message m = receiveMessage(connection); + assertNull(m); + } + + + + public void testQueuePersistentUncommitedAcksLostOnRestart() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + for (int i = 0; i < 4; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + // Setup the consumer and receive the message. + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Begin the transaction. + LocalTransactionId txid = createLocalTransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + for (int i = 0; i < 4; i++) { + Message m = receiveMessage(connection); + assertNotNull(m); + MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + } + // Don't commit + + // restart the broker. + restartBroker(); + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // All messages should be re-delivered. + for (int i = 0; i < 4; i++) { + Message m = receiveMessage(connection); + assertNotNull(m); + } + + assertNoMessagesLeft(connection); + } + + public void testQueuePersistentXAUncommitedAcksLostOnRestart() throws Exception { + int NUMBER = 100; + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + for (int i = 0; i < NUMBER; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + // Setup the consumer and receive the message. + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + Message m = null; + for (int i = 0; i < NUMBER; i++) { + m = receiveMessage(connection); + assertNotNull(m); + } + MessageAck ack = createAck(consumerInfo, m, NUMBER, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + + // Don't commit + + // restart the broker. + restartBroker(); + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // All messages should be re-delivered. + for (int i = 0; i < NUMBER; i++) { + m = receiveMessage(connection); + assertNotNull(m); + } + + assertNoMessagesLeft(connection); + } + + public static Test suite() { + return suite(RecoveryBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java new file mode 100644 index 0000000..032934b --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.util.Arrays; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.TopicSubscriber; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.transport.failover.FailoverTransport; +import org.junit.After; +import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(value = Parameterized.class) +public class RedeliveryRestartTest extends TestSupport { + + private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryRestartTest.class); + ActiveMQConnection connection; + BrokerService broker = null; + String queueName = "redeliveryRestartQ"; + + @Parameterized.Parameter + public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = PersistenceAdapterChoice.KahaDB; + + @Parameterized.Parameters(name="Store={0}") + public static Iterable<Object[]> data() { + return Arrays.asList(new Object[][]{{TestSupport.PersistenceAdapterChoice.KahaDB},{TestSupport.PersistenceAdapterChoice.JDBC},{TestSupport.PersistenceAdapterChoice.LevelDB}}); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + broker = new BrokerService(); + configureBroker(broker); + broker.setDeleteAllMessagesOnStartup(true); + broker.start(); + } + + @Override + @After + public void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + broker.stop(); + super.tearDown(); + } + + protected void configureBroker(BrokerService broker) throws Exception { + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setPersistJMSRedelivered(true); + policyMap.setDefaultEntry(policy); + broker.setDestinationPolicy(policyMap); + setPersistenceAdapter(broker, persistenceAdapterChoice); + broker.addConnector("tcp://0.0.0.0:0"); + } + + @org.junit.Test + public void testValidateRedeliveryFlagAfterRestartNoTx() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString() + + ")?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Destination destination = session.createQueue(queueName); + populateDestination(10, destination, connection); + + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = null; + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(20000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + } + consumer.close(); + + restartBroker(); + + // make failover aware of the restarted auto assigned port + connection.getTransport().narrow(FailoverTransport.class).add(true, broker.getTransportConnectors().get(0) + .getPublishableConnectString()); + + consumer = session.createConsumer(destination); + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(4000); + LOG.info("redelivered? got: " + msg); + assertNotNull("got the message again", msg); + assertEquals("re delivery flag", true, msg.getJMSRedelivered()); + assertEquals("redelivery count survives restart", 2, msg.getLongProperty("JMSXDeliveryCount")); + msg.acknowledge(); + } + + // consume the rest that were not redeliveries + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(20000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + msg.acknowledge(); + } + connection.close(); + } + + @org.junit.Test + public void testDurableSubRedeliveryFlagAfterRestartNotSupported() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString() + + ")?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.setClientID("id"); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ActiveMQTopic destination = new ActiveMQTopic(queueName); + + TopicSubscriber durableSub = session.createDurableSubscriber(destination, "id"); + + populateDestination(10, destination, connection); + + TextMessage msg = null; + for (int i = 0; i < 5; i++) { + msg = (TextMessage) durableSub.receive(20000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + } + durableSub.close(); + + restartBroker(); + + // make failover aware of the restarted auto assigned port + connection.getTransport().narrow(FailoverTransport.class).add(true, broker.getTransportConnectors().get(0) + .getPublishableConnectString()); + + durableSub = session.createDurableSubscriber(destination, "id"); + for (int i = 0; i < 10; i++) { + msg = (TextMessage) durableSub.receive(4000); + LOG.info("redelivered? got: " + msg); + assertNotNull("got the message again", msg); + assertEquals("no reDelivery flag", false, msg.getJMSRedelivered()); + msg.acknowledge(); + } + connection.close(); + } + + @org.junit.Test + public void testValidateRedeliveryFlagAfterRestart() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString() + + ")?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(queueName); + populateDestination(10, destination, connection); + + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = null; + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(20000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + } + session.rollback(); + consumer.close(); + + restartBroker(); + + // make failover aware of the restarted auto assigned port + connection.getTransport().narrow(FailoverTransport.class).add(true, broker.getTransportConnectors().get(0) + .getPublishableConnectString()); + + consumer = session.createConsumer(destination); + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(4000); + LOG.info("redelivered? got: " + msg); + assertNotNull("got the message again", msg); + assertEquals("redelivery count survives restart", 2, msg.getLongProperty("JMSXDeliveryCount")); + assertEquals("re delivery flag", true, msg.getJMSRedelivered()); + } + session.commit(); + + // consume the rest that were not redeliveries + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(20000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + } + session.commit(); + + connection.close(); + } + + @org.junit.Test + public void testValidateRedeliveryFlagAfterRecovery() throws Exception { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + + "?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(queueName); + populateDestination(1, destination, connection); + + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = (TextMessage) consumer.receive(5000); + LOG.info("got: " + msg); + assertNotNull("got the message", msg); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + + stopBrokerWithStoreFailure(broker, persistenceAdapterChoice); + + broker = createRestartedBroker(); + broker.start(); + + connection.close(); + + connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + consumer = session.createConsumer(destination); + msg = (TextMessage) consumer.receive(10000); + assertNotNull("got the message again", msg); + assertEquals("redelivery count survives restart", 2, msg.getLongProperty("JMSXDeliveryCount")); + assertEquals("re delivery flag", true, msg.getJMSRedelivered()); + + session.commit(); + connection.close(); + } + + private void restartBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + broker = createRestartedBroker(); + broker.start(); + } + + private BrokerService createRestartedBroker() throws Exception { + broker = new BrokerService(); + configureBroker(broker); + return broker; + } + + private void populateDestination(final int nbMessages, final Destination destination, javax.jms.Connection connection) throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + for (int i = 1; i <= nbMessages; i++) { + producer.send(session.createTextMessage("<hello id='" + i + "'/>")); + } + producer.close(); + session.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java new file mode 100644 index 0000000..5d8b62e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java @@ -0,0 +1,472 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.io.File; +import java.io.IOException; +import java.util.Set; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.scheduler.JobSchedulerStore; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.ProxyMessageStore; +import org.apache.activemq.store.ProxyTopicMessageStore; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.TransactionStore; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.usage.SystemUsage; +import org.junit.After; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RedeliveryRestartWithExceptionTest extends TestSupport { + + private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryRestartWithExceptionTest.class); + ActiveMQConnection connection; + BrokerService broker = null; + String queueName = "redeliveryRestartQ"; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + broker = new BrokerService(); + configureBroker(broker, true); + broker.setDeleteAllMessagesOnStartup(true); + broker.start(); + } + + @Override + @After + public void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + broker.stop(); + super.tearDown(); + } + + protected void configureBroker(BrokerService broker, boolean throwExceptionOnUpdate) throws Exception { + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setPersistJMSRedelivered(true); + policyMap.setDefaultEntry(policy); + broker.setDestinationPolicy(policyMap); + broker.setPersistenceAdapter(new KahaDBWithUpdateExceptionPersistenceAdapter(throwExceptionOnUpdate)); + broker.addConnector("tcp://0.0.0.0:0"); + } + + @org.junit.Test + public void testValidateRedeliveryFlagAfterRestart() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + + "?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Destination destination = session.createQueue(queueName); + populateDestination(10, destination, connection, true); + TextMessage msg = null; + MessageConsumer consumer = session.createConsumer(destination); + Exception expectedException = null; + try { + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(5000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertTrue("Should not receive the 5th message", i < 4); + //The first 4 messages will be ok but the 5th one should hit an exception in updateMessage and should not be delivered + } + } catch (Exception e) { + //Expecting an exception and disconnect on the 5th message + LOG.info("Got expected:", e); + expectedException = e; + } + assertNotNull("Expecting an exception when updateMessage fails", expectedException); + + consumer.close(); + connection.close(); + + restartBroker(); + + connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + + "?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = session.createQueue(queueName); + consumer = session.createConsumer(destination); + + + // consume the messages that were previously delivered + for (int i = 0; i < 4; i++) { + msg = (TextMessage) consumer.receive(4000); + LOG.info("redelivered? got: " + msg); + assertNotNull("got the message again", msg); + assertEquals("re delivery flag", true, msg.getJMSRedelivered()); + assertTrue("redelivery count survives restart", msg.getLongProperty("JMSXDeliveryCount") > 1); + msg.acknowledge(); + } + + + // consume the rest that were not redeliveries + for (int i = 0; i < 6; i++) { + msg = (TextMessage) consumer.receive(4000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + msg.acknowledge(); + } + connection.close(); + } + + + @org.junit.Test + public void testValidateRedeliveryFlagAfterTransientFailureConnectionDrop() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + + "?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Destination destination = session.createQueue(queueName); + populateDestination(10, destination, connection, true); + TextMessage msg = null; + MessageConsumer consumer = session.createConsumer(destination); + Exception expectedException = null; + try { + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(5000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertTrue("Should not receive the 5th message", i < 4); + //The first 4 messages will be ok but the 5th one should hit an exception in updateMessage and should not be delivered + } + } catch (Exception e) { + //Expecting an exception and disconnect on the 5th message + LOG.info("Got expected:", e); + expectedException = e; + } + assertNotNull("Expecting an exception when updateMessage fails", expectedException); + + consumer.close(); + connection.close(); + + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = session.createQueue(queueName); + consumer = session.createConsumer(destination); + + + // consume the messages that were previously delivered + for (int i = 0; i < 4; i++) { + msg = (TextMessage) consumer.receive(4000); + LOG.info("redelivered? got: " + msg); + assertNotNull("got the message again", msg); + assertEquals("re delivery flag on:" + i, true, msg.getJMSRedelivered()); + assertTrue("redelivery count survives reconnect for:" + i, msg.getLongProperty("JMSXDeliveryCount") > 1); + msg.acknowledge(); + } + + + // consume the rest that were not redeliveries + for (int i = 0; i < 6; i++) { + msg = (TextMessage) consumer.receive(4000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + msg.acknowledge(); + } + connection.close(); + } + + @org.junit.Test + public void testValidateRedeliveryFlagOnNonPersistentAfterTransientFailureConnectionDrop() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + + "?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Destination destination = session.createQueue(queueName); + populateDestination(10, destination, connection, false); + TextMessage msg = null; + MessageConsumer consumer = session.createConsumer(destination); + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(5000); + assertNotNull("got the message", msg); + assertFalse("not redelivered", msg.getJMSRedelivered()); + } + + connection.getTransport().narrow(TcpTransport.class).getTransportListener().onException(new IOException("Die")); + + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = session.createQueue(queueName); + consumer = session.createConsumer(destination); + + // consume the messages that were previously delivered + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(4000); + LOG.info("redelivered? got: " + msg); + assertNotNull("got the message again", msg); + assertEquals("redelivery flag set on:" + i, true, msg.getJMSRedelivered()); + assertTrue("redelivery count survives reconnect for:" + i, msg.getLongProperty("JMSXDeliveryCount") > 1); + msg.acknowledge(); + } + + // consume the rest that were not redeliveries + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(4000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + msg.acknowledge(); + } + connection.close(); + } + + private void restartBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + broker = createRestartedBroker(); + broker.start(); + } + + private BrokerService createRestartedBroker() throws Exception { + broker = new BrokerService(); + configureBroker(broker, false); + return broker; + } + + private void populateDestination(final int nbMessages, final Destination destination, javax.jms.Connection connection, boolean persistent) throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + for (int i = 1; i <= nbMessages; i++) { + producer.send(session.createTextMessage("<hello id='" + i + "'/>")); + } + producer.close(); + session.close(); + } + + private class KahaDBWithUpdateExceptionPersistenceAdapter implements PersistenceAdapter { + + private KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter(); + private boolean throwExceptionOnUpdate; + + public KahaDBWithUpdateExceptionPersistenceAdapter(boolean throwExceptionOnUpdate) { + this.throwExceptionOnUpdate = throwExceptionOnUpdate; + } + + @Override + public void start() throws Exception { + kahaDB.start(); + } + + @Override + public void stop() throws Exception { + kahaDB.stop(); + } + + @Override + public Set<ActiveMQDestination> getDestinations() { + return kahaDB.getDestinations(); + } + + @Override + public MessageStore createQueueMessageStore(ActiveMQQueue destination) + throws IOException { + MessageStore proxyMessageStoreWithException = new ProxyMessageStoreWithUpdateException( + kahaDB.createQueueMessageStore(destination), throwExceptionOnUpdate); + return proxyMessageStoreWithException; + } + + @Override + public TopicMessageStore createTopicMessageStore( + ActiveMQTopic destination) throws IOException { + TopicMessageStore proxyMessageStoreWithException = new ProxyTopicMessageStoreWithUpdateException( + kahaDB.createTopicMessageStore(destination), throwExceptionOnUpdate); + return proxyMessageStoreWithException; + } + + @Override + public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { + return kahaDB.createJobSchedulerStore(); + } + + @Override + public void removeQueueMessageStore(ActiveMQQueue destination) { + kahaDB.removeQueueMessageStore(destination); + } + + @Override + public void removeTopicMessageStore(ActiveMQTopic destination) { + kahaDB.removeTopicMessageStore(destination); + } + + @Override + public TransactionStore createTransactionStore() throws IOException { + return kahaDB.createTransactionStore(); + } + + @Override + public void beginTransaction(ConnectionContext context) + throws IOException { + kahaDB.beginTransaction(context); + } + + @Override + public void commitTransaction(ConnectionContext context) + throws IOException { + kahaDB.commitTransaction(context); + } + + @Override + public void rollbackTransaction(ConnectionContext context) + throws IOException { + kahaDB.rollbackTransaction(context); + } + + @Override + public long getLastMessageBrokerSequenceId() throws IOException { + return kahaDB.getLastMessageBrokerSequenceId(); + } + + @Override + public void deleteAllMessages() throws IOException { + kahaDB.deleteAllMessages(); + } + + @Override + public void setUsageManager(SystemUsage usageManager) { + kahaDB.setUsageManager(usageManager); + } + + @Override + public void setBrokerName(String brokerName) { + kahaDB.setBrokerName(brokerName); + } + + @Override + public void setDirectory(File dir) { + kahaDB.setDirectory(dir); + } + + @Override + public File getDirectory() { + return kahaDB.getDirectory(); + } + + @Override + public void checkpoint(boolean sync) throws IOException { + kahaDB.checkpoint(sync); + } + + @Override + public long size() { + return kahaDB.size(); + } + + @Override + public long getLastProducerSequenceId(ProducerId id) throws IOException { + return kahaDB.getLastProducerSequenceId(id); + } + + } + + private class ProxyMessageStoreWithUpdateException extends ProxyMessageStore { + private boolean throwExceptionOnUpdate; + private int numBeforeException = 4; + public ProxyMessageStoreWithUpdateException(MessageStore delegate, boolean throwExceptionOnUpdate) { + super(delegate); + this.throwExceptionOnUpdate = throwExceptionOnUpdate; + } + + @Override + public void updateMessage(Message message) throws IOException { + if(throwExceptionOnUpdate) { + if(numBeforeException > 0) { + numBeforeException--; + super.updateMessage(message); + } else { + // lets only do it once so we can validate transient store failure + throwExceptionOnUpdate = false; + + //A message that has never been delivered will hit this exception + throw new IOException("Hit our simulated exception writing the update to disk"); + } + } else { + super.updateMessage(message); + } + } + } + + private class ProxyTopicMessageStoreWithUpdateException extends ProxyTopicMessageStore { + private boolean throwExceptionOnUpdate; + private int numBeforeException = 4; + public ProxyTopicMessageStoreWithUpdateException(TopicMessageStore delegate, boolean throwExceptionOnUpdate) { + super(delegate); + this.throwExceptionOnUpdate = throwExceptionOnUpdate; + } + + @Override + public void updateMessage(Message message) throws IOException { + if(throwExceptionOnUpdate) { + if(numBeforeException > 0) { + numBeforeException--; + super.updateMessage(message); + } else { + //A message that has never been delivered will hit this exception + throw new IOException("Hit our simulated exception writing the update to disk"); + } + } else { + super.updateMessage(message); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/SpringTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/SpringTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/SpringTest.java new file mode 100644 index 0000000..7902baa --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/SpringTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker; + +import java.io.File; +import java.util.Iterator; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.activemq.spring.SpringConsumer; +import org.apache.activemq.spring.SpringProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +public class SpringTest extends TestCase { + + private static final Logger LOG = LoggerFactory.getLogger(SpringTest.class); + + protected AbstractApplicationContext context; + protected SpringConsumer consumer; + protected SpringProducer producer; + + public void testSenderWithSpringXml() throws Exception { + assertSenderConfig("org/apache/activemq/broker/spring.xml"); + } + /** + * assert method that is used by all the test method to send and receive messages + * based on each spring configuration. + * + * @param config + * @throws Exception + */ + protected void assertSenderConfig(String config) throws Exception { + context = new ClassPathXmlApplicationContext(config); + + consumer = (SpringConsumer) context.getBean("consumer"); + assertTrue("Found a valid consumer", consumer != null); + + consumer.start(); + + producer = (SpringProducer) context.getBean("producer"); + assertTrue("Found a valid producer", producer != null); + + consumer.flushMessages(); + producer.start(); + + // lets sleep a little to give the JMS time to dispatch stuff + consumer.waitForMessagesToArrive(producer.getMessageCount()); + + // now lets check that the consumer has received some messages + List messages = consumer.flushMessages(); + LOG.info("Consumer has received messages...."); + for (Iterator iter = messages.iterator(); iter.hasNext();) { + Object message = iter.next(); + LOG.info("Received: " + message); + } + + assertEquals("Message count", producer.getMessageCount(), messages.size()); + } + + /** + * Clean up method. + * + * @throws Exception + */ + protected void tearDown() throws Exception { + if (consumer != null) { + consumer.stop(); + } + if (producer != null) { + producer.stop(); + } + + if (context != null) { + context.destroy(); + } + } + + protected void setUp() throws Exception { + if (System.getProperty("basedir") == null) { + File file = new File("."); + System.setProperty("basedir", file.getAbsolutePath()); + } + super.setUp(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/StubBroker.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/StubBroker.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/StubBroker.java new file mode 100644 index 0000000..7b4fa1b --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/StubBroker.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker; + +import java.util.LinkedList; +import org.apache.activemq.command.ConnectionInfo; + +public class StubBroker extends EmptyBroker { + public LinkedList<AddConnectionData> addConnectionData = new LinkedList<AddConnectionData>(); + public LinkedList<RemoveConnectionData> removeConnectionData = new LinkedList<RemoveConnectionData>(); + + public class AddConnectionData { + public final ConnectionContext connectionContext; + public final ConnectionInfo connectionInfo; + + public AddConnectionData(ConnectionContext context, ConnectionInfo info) { + connectionContext = context; + connectionInfo = info; + } + } + + public static class RemoveConnectionData { + public final ConnectionContext connectionContext; + public final ConnectionInfo connectionInfo; + public final Throwable error; + + public RemoveConnectionData(ConnectionContext context, ConnectionInfo info, Throwable error) { + connectionContext = context; + connectionInfo = info; + this.error = error; + } + } + + public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { + addConnectionData.add(new AddConnectionData(context, info)); + } + + public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { + removeConnectionData.add(new RemoveConnectionData(context, info, error)); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/StubConnection.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/StubConnection.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/StubConnection.java new file mode 100644 index 0000000..9a70c4e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/StubConnection.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.activemq.Service; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.Response; +import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.transport.DefaultTransportListener; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.util.JMSExceptionSupport; +import org.apache.activemq.util.ServiceSupport; + +public class StubConnection implements Service { + + private final BlockingQueue<Object> dispatchQueue = new LinkedBlockingQueue<Object>(); + private Connection connection; + private Transport transport; + private boolean shuttingDown; + private TransportListener listener; + public AtomicReference<Throwable> error = new AtomicReference<Throwable>(); + + public StubConnection(BrokerService broker) throws Exception { + this(TransportFactory.connect(broker.getVmConnectorURI())); + } + + public StubConnection(Connection connection) { + this.connection = connection; + } + + public StubConnection(Transport transport) throws Exception { + this(transport, null); + } + + public StubConnection(Transport transport, TransportListener transportListener) throws Exception { + listener = transportListener; + this.transport = transport; + transport.setTransportListener(new DefaultTransportListener() { + public void onCommand(Object command) { + try { + if (command.getClass() == ShutdownInfo.class) { + shuttingDown = true; + } + StubConnection.this.dispatch(command); + } catch (Exception e) { + onException(new IOException("" + e)); + } + } + + public void onException(IOException e) { + if (listener != null) { + listener.onException(e); + } + error.set(e); + } + }); + transport.start(); + } + + protected void dispatch(Object command) throws InterruptedException, IOException { + if (listener != null) { + listener.onCommand(command); + } + dispatchQueue.put(command); + } + + public BlockingQueue<Object> getDispatchQueue() { + return dispatchQueue; + } + + public void send(Command command) throws Exception { + if (command instanceof Message) { + Message message = (Message)command; + message.setProducerId(message.getMessageId().getProducerId()); + } + command.setResponseRequired(false); + if (connection != null) { + Response response = connection.service(command); + if (response != null && response.isException()) { + ExceptionResponse er = (ExceptionResponse)response; + throw JMSExceptionSupport.create(er.getException()); + } + } else if (transport != null) { + transport.oneway(command); + } + } + + public Response request(Command command) throws Exception { + if (command instanceof Message) { + Message message = (Message)command; + message.setProducerId(message.getMessageId().getProducerId()); + } + command.setResponseRequired(true); + if (connection != null) { + Response response = connection.service(command); + if (response != null && response.isException()) { + ExceptionResponse er = (ExceptionResponse)response; + throw JMSExceptionSupport.create(er.getException()); + } + return response; + } else if (transport != null) { + Response response = (Response)transport.request(command); + if (response != null && response.isException()) { + ExceptionResponse er = (ExceptionResponse)response; + throw JMSExceptionSupport.create(er.getException()); + } + return response; + } + return null; + } + + public Connection getConnection() { + return connection; + } + + public Transport getTransport() { + return transport; + } + + public void start() throws Exception { + } + + public void stop() throws Exception { + shuttingDown = true; + if (transport != null) { + try { + transport.oneway(new ShutdownInfo()); + } catch (IOException e) { + } + ServiceSupport.dispose(transport); + } + } + + public TransportListener getListener() { + return listener; + } + + public void setListener(TransportListener listener) { + this.listener = listener; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java new file mode 100644 index 0000000..61ba79c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.util.concurrent.TimeUnit; +import org.apache.activemq.TestSupport; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.util.ThreadTracker; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.BlockJUnit4ClassRunner; + +import static org.junit.Assert.*; + + +@RunWith(BlockJUnit4ClassRunner.class) +public class TopicSubscriptionTest extends QueueSubscriptionTest { + + @Before + public void setUp() throws Exception { + super.setUp(); + durable = true; + topic = true; + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + ThreadTracker.result(); + } + + @Test(timeout = 60 * 1000) + public void testManyProducersManyConsumers() throws Exception { + consumerCount = 40; + producerCount = 20; + messageCount = 100; + messageSize = 1; + prefetchCount = 10; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount * consumerCount); + assertDestinationMemoryUsageGoesToZero(); + } + + @Test(timeout = 60 * 1000) + public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception { + consumerCount = 2; + producerCount = 1; + messageCount = 10; + messageSize = 1024 * 1024 * 1; // 1 MB + prefetchCount = 1; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * consumerCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); + } + + @Test(timeout = 60 * 1000) + public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception { + consumerCount = 2; + producerCount = 1; + prefetchCount = 1; + messageSize = 1024; + messageCount = 1000; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * consumerCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); + } + + @Test(timeout = 60 * 1000) + public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception { + consumerCount = 2; + producerCount = 1; + messageCount = 1000; + messageSize = 1024; + prefetchCount = messageCount * 2; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * consumerCount * producerCount); + } + + @Test(timeout = 60 * 1000) + public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception { + consumerCount = 2; + producerCount = 1; + messageCount = 10; + messageSize = 1024 * 1024 * 1; // 1 MB + prefetchCount = messageCount * 2; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * consumerCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); + } + + @Test(timeout = 60 * 1000) + public void testOneProducerManyConsumersFewMessages() throws Exception { + consumerCount = 50; + producerCount = 1; + messageCount = 10; + messageSize = 1; // 1 byte + prefetchCount = 10; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * consumerCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); + } + + @Test(timeout = 60 * 1000) + public void testOneProducerManyConsumersManyMessages() throws Exception { + consumerCount = 50; + producerCount = 1; + messageCount = 100; + messageSize = 1; // 1 byte + prefetchCount = 10; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * consumerCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); + } + + + @Test(timeout = 60 * 1000) + public void testManyProducersOneConsumer() throws Exception { + consumerCount = 1; + producerCount = 20; + messageCount = 100; + messageSize = 1; // 1 byte + prefetchCount = 10; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount * consumerCount); + assertDestinationMemoryUsageGoesToZero(); + } + +}
