Repository: activemq Updated Branches: refs/heads/master 6cf8bed0c -> f47b37057
https://issues.apache.org/jira/browse/AMQ-6286 extend strictOrderDispatch to retain order of redispatched messages for a single consumer Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f47b3705 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f47b3705 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f47b3705 Branch: refs/heads/master Commit: f47b370573d5a2bcb5a000c1adce0bbf7d40f5b1 Parents: 6cf8bed Author: gtully <[email protected]> Authored: Mon May 9 23:03:53 2016 +0100 Committer: gtully <[email protected]> Committed: Mon May 9 23:06:19 2016 +0100 ---------------------------------------------------------------------- .../apache/activemq/broker/region/Queue.java | 8 +- .../region/cursors/OrderedPendingList.java | 28 +++++ .../broker/region/cursors/PendingList.java | 3 + .../region/cursors/PrioritizedPendingList.java | 11 ++ .../cursors/QueueDispatchPendingList.java | 21 +++- .../region/cursors/OrderPendingListTest.java | 56 ++++++++++ .../QueueOrderSingleTransactedConsumerTest.java | 112 +++++++++++++++++++ 7 files changed, 232 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/f47b3705/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index af37bdc..0d06022 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -556,7 +556,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } } - for (MessageReference ref : unAckedMessages) { + for (Iterator<MessageReference> unackedListIterator = unAckedMessages.iterator(); unackedListIterator.hasNext(); ) { + MessageReference ref = unackedListIterator.next(); // AMQ-5107: don't resend if the broker is shutting down if ( this.brokerService.isStopping() ) { break; @@ -578,10 +579,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } } } - if (!qmr.isDropped()) { - dispatchPendingList.addMessageForRedelivery(qmr); + if (qmr.isDropped()) { + unackedListIterator.remove(); } } + dispatchPendingList.addForRedelivery(unAckedMessages, strictOrderDispatch && consumers.isEmpty()); if (sub instanceof QueueBrowserSubscription) { ((QueueBrowserSubscription)sub).decrementQueueRef(); browserDispatches.remove(sub); http://git-wip-us.apache.org/repos/asf/activemq/blob/f47b3705/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java index 71b7212..d0e4c47 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java @@ -198,4 +198,32 @@ public class OrderedPendingList implements PendingList { } return null; } + + @Override + public void insertAtHead(List<MessageReference> list) { + if (list != null && !list.isEmpty()) { + PendingNode newHead = null; + PendingNode appendNode = null; + for (MessageReference ref : list) { + PendingNode node = new PendingNode(this, ref); + pendingMessageHelper.addToMap(ref, node); + if (newHead == null) { + newHead = node; + appendNode = node; + continue; + } + appendNode.linkAfter(node); + appendNode = node; + } + // insert this new list at root + if (root == null) { + root = newHead; + tail = appendNode; + } else { + appendNode.linkAfter(root); + root = newHead; + } + } + } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/f47b3705/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java index adfa78e..7cc78b8 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java @@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.cursors; import java.util.Collection; import java.util.Iterator; +import java.util.List; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.command.MessageId; @@ -114,4 +115,6 @@ public interface PendingList extends Iterable<MessageReference> { public void addAll(PendingList pendingList); public MessageReference get(MessageId messageId); + + public void insertAtHead(List<MessageReference> list); } http://git-wip-us.apache.org/repos/asf/activemq/blob/f47b3705/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java index 8a9bb17..cd62081 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Deque; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import org.apache.activemq.broker.region.MessageReference; @@ -205,4 +206,14 @@ public class PrioritizedPendingList implements PendingList { return null; } + @Override + public void insertAtHead(List<MessageReference> list) { + // behave like addAll - pure order within priority lists is not required + if (list != null) { + for (MessageReference ref: list) { + addMessageLast(ref); + } + } + } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/f47b3705/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java index 788b5e5..5aff9b3 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java @@ -193,6 +193,11 @@ public class QueueDispatchPendingList implements PendingList { return rc; } + @Override + public void insertAtHead(List<MessageReference> list) { + throw new IllegalStateException("no insertion support in: " + this.getClass().getCanonicalName()); + } + public void setPrioritizedMessages(boolean prioritizedMessages) { prioritized = prioritizedMessages; if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) { @@ -204,11 +209,19 @@ public class QueueDispatchPendingList implements PendingList { } } - public void addMessageForRedelivery(QueueMessageReference qmr) { - redeliveredWaitingDispatch.addMessageLast(qmr); - } - public boolean hasRedeliveries(){ return !redeliveredWaitingDispatch.isEmpty(); } + + public void addForRedelivery(List<MessageReference> list, boolean noConsumers) { + if (noConsumers) { + // a single consumer can expect repeatable redelivery order irrespective + // of transaction or prefetch boundaries + redeliveredWaitingDispatch.insertAtHead(list); + } else { + for (MessageReference ref : list) { + redeliveredWaitingDispatch.addMessageLast(ref); + } + } + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/f47b3705/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java index 6a9dd6b..697581e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java @@ -20,9 +20,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; @@ -268,6 +271,53 @@ public class OrderPendingListTest { list.addAll(null); } + @Test + public void testInsertAtHead() throws Exception { + OrderedPendingList underTest = new OrderedPendingList(); + + TestPendingList source = new TestPendingList(); + source.addMessageLast(new TestMessageReference(1)); + source.addMessageLast(new TestMessageReference(2)); + source.addMessageLast(new TestMessageReference(3)); + source.addMessageLast(new TestMessageReference(4)); + source.addMessageLast(new TestMessageReference(5)); + + assertTrue(underTest.isEmpty()); + assertEquals(5, source.size()); + + LinkedList linkedList = new LinkedList(); + linkedList.addAll(source.values()); + underTest.insertAtHead(linkedList); + assertEquals(5, underTest.size()); + + underTest.insertAtHead(null); + + linkedList.clear(); + + Iterator<MessageReference> iterator = underTest.iterator(); + for (int i=0; i < 2 && iterator.hasNext(); i++ ) { + MessageReference ref = iterator.next(); + linkedList.addLast(ref); + iterator.remove(); + assertEquals(ref.getMessageId().getProducerSequenceId(), i + 1); + } + + assertEquals(3, underTest.size()); + + underTest.insertAtHead(linkedList); + assertEquals(5, underTest.size()); + + iterator = underTest.iterator(); + for (int i=0; iterator.hasNext(); i++ ) { + MessageReference ref = iterator.next(); + linkedList.addLast(ref); + iterator.remove(); + assertEquals(ref.getMessageId().getProducerSequenceId(), i + 1); + } + assertEquals(0, underTest.size()); + + } + static class TestPendingList implements PendingList { private final LinkedList<MessageReference> theList = new LinkedList<MessageReference>(); @@ -349,6 +399,12 @@ public class OrderPendingListTest { } return null; } + + @Override + public void insertAtHead(List<MessageReference> list) { + theList.addAll(list); + + } } static class TestMessageReference implements MessageReference { http://git-wip-us.apache.org/repos/asf/activemq/blob/f47b3705/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java new file mode 100644 index 0000000..4ef36bd --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.usecases; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class QueueOrderSingleTransactedConsumerTest { + + BrokerService broker = null; + ActiveMQQueue dest = new ActiveMQQueue("Queue"); + + @Test + public void testSingleConsumerTxRepeat() throws Exception { + + publishMessages(100); + + consumeVerifyOrderAndRollback(20); + consumeVerifyOrderAndRollback(10); + consumeVerifyOrderAndRollback(5); + } + + private void consumeVerifyOrderAndRollback(final int num) throws Exception { + Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer messageConsumer = session.createConsumer(dest); + for (int i=0; i<num; ) { + Message message = messageConsumer.receive(4000); + if (message != null) { + assertEquals(i, message.getIntProperty("Order")); + i++; + } + } + session.rollback(); + connection.close(); + } + + private void publishMessages(int num) throws Exception { + Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer messageProducer = session.createProducer(dest); + TextMessage textMessage = session.createTextMessage("A"); + for (int i=0; i<num; i++) { + textMessage.setIntProperty("Order", i); + messageProducer.send(textMessage); + } + } + + @Before + public void startBroker() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + + // add the policy entries + PolicyMap policyMap = new PolicyMap(); + List<PolicyEntry> entries = new ArrayList<PolicyEntry>(); + PolicyEntry pe = new PolicyEntry(); + pe.setExpireMessagesPeriod(0); + + pe.setQueuePrefetch(0); // make incremental dispatch to the consumers explicit + pe.setStrictOrderDispatch(true); // force redeliveries back to the head of the queue + + pe.setQueue(">"); + entries.add(pe); + policyMap.setPolicyEntries(entries); + broker.setDestinationPolicy(policyMap); + + broker.addConnector("tcp://0.0.0.0:0"); + broker.start(); + } + + @After + public void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + } + } +}
