Fixed some redelivery tests
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2d6bed79 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2d6bed79 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2d6bed79 Branch: refs/heads/refactor-openwire Commit: 2d6bed7914a1d8d1cb271848b1f5f1f4b4de2da1 Parents: e8856b8 Author: Howard Gao <[email protected]> Authored: Fri Feb 26 22:24:03 2016 +0800 Committer: Clebert Suconic <[email protected]> Committed: Sun Feb 28 22:31:23 2016 -0500 ---------------------------------------------------------------------- .../openwire/amq/AMQServerConsumer.java | 22 ++++++++++++++++++++ .../protocol/openwire/amq/AMQServerSession.java | 7 +++++++ .../core/server/impl/ServerConsumerImpl.java | 16 ++++++++------ .../activemq/test/JmsTopicSendReceiveTest.java | 2 ++ 4 files changed, 41 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2d6bed79/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java index 3f94351..34789b0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java @@ -35,6 +35,7 @@ public class AMQServerConsumer extends ServerConsumerImpl { // TODO-NOW: remove this once unified AMQConsumer amqConsumer; + boolean isClosing; public AMQConsumer getAmqConsumer() { return amqConsumer; @@ -67,6 +68,18 @@ public class AMQServerConsumer extends ServerConsumerImpl { this.browserDeliverer = newBrowserDeliverer; } + public void closing() { + isClosing = true; + } + + @Override + public HandleStatus handle(final MessageReference ref) throws Exception { + if (isClosing) { + return HandleStatus.BUSY; + } + return super.handle(ref); + } + private class AMQBrowserDeliverer extends BrowserDeliverer { private BrowserListener listener = null; @@ -174,4 +187,13 @@ public class AMQServerConsumer extends ServerConsumerImpl { } } + @Override + protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) { + //activemq5 doesn't decrease the count + //when not failed. + if (failed) { + ref.decrementDeliveryCount(); + } + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2d6bed79/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java index 5403830..390b58f 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -90,6 +91,12 @@ public class AMQServerSession extends ServerSessionImpl { @Override protected void doClose(final boolean failed) throws Exception { + Set<ServerConsumer> consumersClone = new HashSet<>(consumers.values()); + for (ServerConsumer consumer : consumersClone) { + AMQServerConsumer amqConsumer = (AMQServerConsumer)consumer; + amqConsumer.closing();//prevent redeliver + } + synchronized (this) { if (tx != null && tx.getXid() == null) { ((AMQTransactionImpl) tx).setRollbackForClose(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2d6bed79/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index ab9dec9..08185db 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -556,12 +556,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } else { refs.add(ref); - if (!failed) { - // We don't decrement delivery count if the client failed, since there's a possibility that refs - // were actually delivered but we just didn't get any acks for them - // before failure - ref.decrementDeliveryCount(); - } + updateDeliveryCountForCanceledRef(ref, failed); } if (isTrace) { @@ -576,6 +571,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { return refs; } + protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) { + if (!failed) { + // We don't decrement delivery count if the client failed, since there's a possibility that refs + // were actually delivered but we just didn't get any acks for them + // before failure + ref.decrementDeliveryCount(); + } + } + @Override public void setStarted(final boolean started) { synchronized (lock) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2d6bed79/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java index 28ac25e..ddc6cd8 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java @@ -24,6 +24,7 @@ import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper; +import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,6 +93,7 @@ public class JmsTopicSendReceiveTest extends JmsSendReceiveTestSupport { session.close(); connection.close(); ArtemisBrokerHelper.stopArtemisBroker(); + TcpTransportFactory.clearService(); } /**
