https://issues.apache.org/jira/browse/AMQ-5266 - fix edge case with optimizedDispatch=true where a single message could be pending till the next page in event
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5861d86a Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5861d86a Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5861d86a Branch: refs/heads/trunk Commit: 5861d86ad39cac1644b1a48157bd6c799a586ac4 Parents: 26807cd Author: gtully <[email protected]> Authored: Thu Sep 11 16:59:50 2014 +0100 Committer: gtully <[email protected]> Committed: Thu Sep 11 16:59:50 2014 +0100 ---------------------------------------------------------------------- .../apache/activemq/broker/region/Queue.java | 6 ++-- .../org/apache/activemq/bugs/AMQ5266Test.java | 29 ++++++++++++-------- 2 files changed, 20 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/5861d86a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index ff16dfc..c7f768e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -781,12 +781,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index sendLock.unlock(); } for (MessageContext messageContext : orderedUpdates) { - if (!messageContext.duplicate) { - messageSent(messageContext.context, messageContext.message); - } if (messageContext.onCompletion != null) { messageContext.onCompletion.run(); } + if (!messageContext.duplicate) { + messageSent(messageContext.context, messageContext.message); + } } orderedUpdates.clear(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/5861d86a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java index 626fe6e..efccefa 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java @@ -87,22 +87,26 @@ public class AMQ5266Test { @Parameterized.Parameter(5) public boolean useDefaultStore = false; - @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5}") + @Parameterized.Parameter(6) + public boolean optimizeDispatch = false; + + @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}") public static Iterable<Object[]> parameters() { return Arrays.asList(new Object[][]{ // jdbc - {1000, 20, 5, 50*1024, true, false}, - {100, 20, 5, 50*1024, false, false}, - {1000, 5, 20, 50*1024, true, false}, - {1000, 20, 20, 1024*1024, true, false}, - {1000, 100, 100, 1024*1024, true, false}, + {1, 1, 1, 50*1024, false, false, true}, + {1000, 20, 5, 50*1024, true, false, false}, + {100, 20, 5, 50*1024, false, false, false}, + {1000, 5, 20, 50*1024, true, false, false}, + {1000, 20, 20, 1024*1024, true, false, false}, // default store - {1000, 20, 5, 50*1024, true, true}, - {100, 20, 5, 50*1024, false, true}, - {1000, 5, 20, 50*1024, true, true}, - {1000, 20, 20, 1024*1024, true, true}, - {1000, 100, 100, 1024*1024, true, true} + {1, 1, 1, 50*1024, false, true, true}, + {100, 5, 5, 50*1024, false, true, false}, + {1000, 20, 5, 50*1024, true, true, false}, + {100, 20, 5, 50*1024, false, true, false}, + {1000, 5, 20, 50*1024, true, true, false}, + {1000, 20, 20, 1024*1024, true, true, false}, }); } @@ -127,6 +131,7 @@ public class AMQ5266Test { kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true); } brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setUseJmx(false); PolicyMap policyMap = new PolicyMap(); @@ -136,7 +141,7 @@ public class AMQ5266Test { defaultEntry.setEnableAudit(true); defaultEntry.setUseCache(useCache); defaultEntry.setMaxPageSize(1000); - defaultEntry.setOptimizedDispatch(false); + defaultEntry.setOptimizedDispatch(optimizeDispatch); defaultEntry.setMemoryLimit(destMemoryLimit); defaultEntry.setExpireMessagesPeriod(0); policyMap.setDefaultEntry(defaultEntry);
