ARTEMIS-856 Fixing MessageRedistributionTest
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ec742cb8 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ec742cb8 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ec742cb8 Branch: refs/heads/master Commit: ec742cb8899bd1b7d7cb28b7e64c87f75681f6e7 Parents: 59520b9 Author: Michael André Pearce <[email protected]> Authored: Thu Aug 2 10:51:27 2018 +0100 Committer: Michael André Pearce <[email protected]> Committed: Thu Aug 2 13:52:32 2018 +0100 ---------------------------------------------------------------------- .../artemis/core/server/impl/QueueImpl.java | 37 ++++++++++++++------ 1 file changed, 27 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec742cb8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 2656217..09ff210 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -21,6 +21,7 @@ import java.io.StringWriter; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -1082,6 +1083,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { supports = false; } } + if (redistributor != null) { + if (!redistributor.consumer.supportsDirectDelivery()) { + supports = false; + } + } return supports; } @@ -2158,10 +2164,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public synchronized void resetAllIterators() { for (ConsumerHolder holder : this.consumerList) { - if (holder.iter != null) { - holder.iter.close(); - } - holder.iter = null; + holder.resetIterator(); + } + if (redistributor != null) { + redistributor.resetIterator(); } } @@ -2369,7 +2375,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { MessageReference ref; Consumer handledconsumer = null; - + SimpleString groupID; synchronized (this) { // Need to do these checks inside the synchronized @@ -2436,7 +2442,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // If a group id is set, then this overrides the consumer chosen round-robin - SimpleString groupID = extractGroupID(ref); + groupID = extractGroupID(ref); if (groupID != null) { groupConsumer = groups.get(groupID); @@ -2484,10 +2490,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - if (pos == endPos) { + if (pos == endPos || (redistributor != null || groupConsumer != null || exclusive)) { // Round robin'd all - if (noDelivery == size) { + if (noDelivery == size && redistributor == null || ((redistributor != null || groupConsumer != null || exclusive) && noDelivery > 0)) { if (handledconsumer != null) { // this shouldn't really happen, // however I'm keeping this as an assertion case future developers ever change the logic here on this class @@ -3040,7 +3046,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return true; } - if (pos == startPos) { + if (pos == startPos || redistributor != null || groupConsumer != null || exclusive) { // Tried them all break; } @@ -3122,7 +3128,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { List<ConsumerHolder> consumerListClone; synchronized (this) { - consumerListClone = new ArrayList<>(consumerList); + if (redistributor == null) { + consumerListClone = new ArrayList<>(consumerList); + } else { + consumerListClone = Collections.singletonList(redistributor); + } } return consumerListClone; } @@ -3286,6 +3296,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { LinkedListIterator<MessageReference> iter; + private void resetIterator() { + if (iter != null) { + iter.close(); + } + iter = null; + } + } private class DelayedAddRedistributor implements Runnable {
