ARTEMIS-1868 Openwire doesn't add delivery count in client ack mode If a client ack mode consumer receives a message and closes without acking it, the redelivery of the message won't set the redelivery flag (JMSRedelivered) because it doesn't increment the delivery count when message is cancelled back to queue.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/47b31b53 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/47b31b53 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/47b31b53 Branch: refs/heads/2.6.x Commit: 47b31b53d608f762b9c38e924c5a9f8b92f384b5 Parents: 9eed307 Author: Howard Gao <[email protected]> Authored: Wed May 16 11:14:48 2018 +0800 Committer: Clebert Suconic <[email protected]> Committed: Mon May 21 18:00:08 2018 -0400 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 1 + .../core/protocol/openwire/amq/AMQConsumer.java | 4 +- .../core/protocol/openwire/amq/AMQSession.java | 14 ++++++ .../openwire/amq/RedeliveryPolicyTest.java | 52 ++++++++++++++++++++ 4 files changed, 70 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47b31b53/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 6a10de7..f666785 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -1294,6 +1294,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se referenceIterator.remove(); ref.incrementDeliveryCount(); consumer.backToDelivering(ref); + session.addRolledback(ref.getMessageID()); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47b31b53/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index e0b02ae..0b7eff5 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -353,7 +353,7 @@ public class AMQConsumer { } public boolean updateDeliveryCountAfterCancel(MessageReference ref) { - long seqId = ref.getMessage().getMessageID(); + long seqId = ref.getMessageID(); long lastDelSeqId = info.getLastDeliveredSequenceId(); //in activemq5, closing a durable subscription won't close the consumer @@ -373,6 +373,8 @@ public class AMQConsumer { // tx cases are handled by // org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection.CommandProcessor.processRollbackTransaction() ref.incrementDeliveryCount(); + } else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNSET && !session.isRolledBack(seqId)) { + ref.incrementDeliveryCount(); } return true; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47b31b53/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index c3b1a20..34e2c0f 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -20,6 +20,7 @@ import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUt import java.io.IOException; import java.util.List; +import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -52,6 +53,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; +import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; @@ -95,6 +97,8 @@ public class AMQSession implements SessionCallback { private final SimpleString clientId; + private final Set<Long> rollbackedIds = new ConcurrentHashSet<>(); + public AMQSession(ConnectionInfo connInfo, SessionInfo sessInfo, ActiveMQServer server, @@ -308,6 +312,8 @@ public class AMQSession implements SessionCallback { ServerConsumer consumer, int deliveryCount) { AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData(); + //clear up possible rolledback ids. + rollbackedIds.remove(message.getMessageID()); // TODO: use encoders and proper conversions here return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount); } @@ -542,4 +548,12 @@ public class AMQSession implements SessionCallback { public boolean isInternal() { return sessInfo.getSessionId().getValue() == -1; } + + public void addRolledback(long messageID) { + this.rollbackedIds.add(messageID); + } + + public boolean isRolledBack(long mid) { + return rollbackedIds.remove(mid); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47b31b53/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java index 7ee0eb9..3e50cc7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java @@ -21,9 +21,11 @@ import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; @@ -633,4 +635,54 @@ public class RedeliveryPolicyTest extends BasicOpenWireTest { session.commit(); } + @Test + public void testClientRedlivery() throws Exception { + + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + this.makeSureCoreQueueExist("TEST"); + + Queue queue = session.createQueue("TEST"); + + MessageProducer producer = session.createProducer(queue); + + producer.send(session.createTextMessage("test")); + + } finally { + connection.close(); + } + + for (int i = 0; i < 10; ++i) { + + connection = (ActiveMQConnection) factory.createConnection(); + + connection.start(); + + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Queue queue = session.createQueue("TEST"); + + MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(1000); + + assertNotNull("Message null on iteration " + i, message); + + System.out.println("received message: " + i); + System.out.println("is redelivered: " + message.getJMSRedelivered()); + if (i > 0) { + assertTrue(message.getJMSRedelivered()); + } + + } finally { + connection.close(); + } + } + + } + }
