Repository: activemq Updated Branches: refs/heads/master 2c3046b81 -> c2230fda4
https://issues.apache.org/jira/browse/AMQ-6286 - refine fix to distinguish multiple consumers in a transaction, verify insertion at head will preserve order Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c2230fda Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c2230fda Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c2230fda Branch: refs/heads/master Commit: c2230fda4bd8eed7bba3245ad4b11fcd8e3bd581 Parents: 2c3046b Author: gtully <[email protected]> Authored: Tue May 17 17:03:05 2016 +0100 Committer: gtully <[email protected]> Committed: Wed May 18 10:09:39 2016 +0100 ---------------------------------------------------------------------- .../cursors/QueueDispatchPendingList.java | 10 +- .../QueueOrderSingleTransactedConsumerTest.java | 108 ++++++++++++++++--- 2 files changed, 102 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/c2230fda/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java index a01795c..ae35b4e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java @@ -208,7 +208,7 @@ public class QueueDispatchPendingList implements PendingList { } public void addForRedelivery(List<MessageReference> list, boolean noConsumers) { - if (noConsumers && redeliveredWaitingDispatch instanceof OrderedPendingList) { + if (noConsumers && redeliveredWaitingDispatch instanceof OrderedPendingList && willBeInOrder(list)) { // a single consumer can expect repeatable redelivery order irrespective // of transaction or prefetch boundaries ((OrderedPendingList)redeliveredWaitingDispatch).insertAtHead(list); @@ -218,4 +218,12 @@ public class QueueDispatchPendingList implements PendingList { } } } + + private boolean willBeInOrder(List<MessageReference> list) { + // for a single consumer inserting at head will be in order w.r.t brokerSequence but + // will not be if there were multiple consumers in the mix even if this is the last + // consumer to close (noConsumers==true) + return !redeliveredWaitingDispatch.isEmpty() && list != null && !list.isEmpty() && + redeliveredWaitingDispatch.iterator().next().getMessageId().getBrokerSequenceId() > list.get(list.size() - 1).getMessageId().getBrokerSequenceId(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/c2230fda/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java index 78c50b3..87a967c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java @@ -25,6 +25,8 @@ import org.apache.activemq.command.ActiveMQQueue; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.jms.Connection; import javax.jms.Message; @@ -39,13 +41,26 @@ import static org.junit.Assert.assertEquals; public class QueueOrderSingleTransactedConsumerTest { + private static final Logger LOG = LoggerFactory.getLogger(QueueOrderSingleTransactedConsumerTest.class); + BrokerService broker = null; ActiveMQQueue dest = new ActiveMQQueue("Queue"); @Test public void testSingleConsumerTxRepeat() throws Exception { - publishMessages(100); + // effect the broker sequence id that is region wide + ActiveMQQueue dummyDest = new ActiveMQQueue("AnotherQueue"); + publishMessagesWithOrderProperty(10, 0, dest); + publishMessagesWithOrderProperty(1, 0, dummyDest); + + publishMessagesWithOrderProperty(10, 10, dest); + publishMessagesWithOrderProperty(1, 0, dummyDest); + + publishMessagesWithOrderProperty(10, 20, dest); + publishMessagesWithOrderProperty(1, 0, dummyDest); + + publishMessagesWithOrderProperty(5, 30, dest); consumeVerifyOrderRollback(20); consumeVerifyOrderRollback(10); @@ -55,48 +70,105 @@ public class QueueOrderSingleTransactedConsumerTest { @Test public void testSingleSessionXConsumerTxRepeat() throws Exception { - publishMessages(100); + publishMessagesWithOrderProperty(50); + + Connection connection = getConnectionFactory().createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer messageConsumer = consumeVerifyOrder(session, 20); + messageConsumer.close(); + session.rollback(); + messageConsumer = consumeVerifyOrder(session, 10); + messageConsumer.close(); + session.rollback(); + messageConsumer = consumeVerifyOrder(session, 5); + messageConsumer.close(); + session.commit(); + connection.close(); + } + + @Test + public void tesXConsumerTxRepeat() throws Exception { + + publishMessagesWithOrderProperty(10); - Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection(); + Connection connection = getConnectionFactory().createConnection(); connection.start(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - consumeVerifyOrder(session, 20); + MessageConsumer messageConsumer = consumeVerifyOrder(session, 6); + messageConsumer.close(); + messageConsumer = consumeVerifyOrder(session, 4, 6); + + // rollback before close, so there are two consumers in the mix session.rollback(); - consumeVerifyOrder(session, 10); + + messageConsumer.close(); + + messageConsumer = consumeVerifyOrder(session, 10); + session.commit(); + messageConsumer.close(); + connection.close(); + } + + @Test + public void testSingleTxXConsumerTxRepeat() throws Exception { + + publishMessagesWithOrderProperty(10); + + Connection connection = getConnectionFactory().createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer messageConsumer = consumeVerifyOrder(session, 6); + messageConsumer.close(); + messageConsumer = consumeVerifyOrder(session, 4, 6); + messageConsumer.close(); + session.rollback(); - consumeVerifyOrder(session, 5); + messageConsumer = consumeVerifyOrder(session, 10); session.commit(); + messageConsumer.close(); + connection.close(); } private void consumeVerifyOrderRollback(final int num) throws Exception { - Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection(); + Connection connection = getConnectionFactory().createConnection(); connection.start(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - consumeVerifyOrder(session, num); + MessageConsumer messageConsumer = consumeVerifyOrder(session, num); + messageConsumer.close(); session.rollback(); connection.close(); } - private void consumeVerifyOrder(Session session, final int num) throws Exception { + private MessageConsumer consumeVerifyOrder(Session session, final int num) throws Exception { + return consumeVerifyOrder(session, num, 0); + } + + private MessageConsumer consumeVerifyOrder(Session session, final int num, final int base) throws Exception { MessageConsumer messageConsumer = session.createConsumer(dest); for (int i=0; i<num; ) { Message message = messageConsumer.receive(4000); if (message != null) { - assertEquals(i, message.getIntProperty("Order")); + assertEquals(i + base, message.getIntProperty("Order")); i++; + LOG.debug("Received:" + message.getJMSMessageID() + ", Order: " + message.getIntProperty("Order")); } } - messageConsumer.close(); + return messageConsumer; } - private void publishMessages(int num) throws Exception { - Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection(); + private void publishMessagesWithOrderProperty(int num) throws Exception { + publishMessagesWithOrderProperty(num, 0, dest); + } + + private void publishMessagesWithOrderProperty(int num, int seqStart, ActiveMQQueue destination) throws Exception { + Connection connection = getConnectionFactory().createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer messageProducer = session.createProducer(dest); + MessageProducer messageProducer = session.createProducer(destination); TextMessage textMessage = session.createTextMessage("A"); for (int i=0; i<num; i++) { - textMessage.setIntProperty("Order", i); + textMessage.setIntProperty("Order", i + seqStart); messageProducer.send(textMessage); } } @@ -130,4 +202,10 @@ public class QueueOrderSingleTransactedConsumerTest { broker.stop(); } } + + + private ActiveMQConnectionFactory getConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()); + } + }
