ARTEMIS-1868 Openwire doesn't add delivery count in client ack mode If a client ack mode consumer receives a message and closes without acking it, the redelivery of the message won't set the redelivery flag (JMSRedelivered) because it doesn't increment the delivery count when message is cancelled back to queue. (Perf improvement)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f24d97bf Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f24d97bf Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f24d97bf Branch: refs/heads/2.6.x Commit: f24d97bfd11b44c4ac7e672a1ec089ea9db9422a Parents: 47b31b5 Author: Francesco Nigro <[email protected]> Authored: Wed May 16 11:33:24 2018 +0200 Committer: Clebert Suconic <[email protected]> Committed: Mon May 21 18:02:40 2018 -0400 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 3 +- .../core/protocol/openwire/amq/AMQConsumer.java | 51 +++++++++++++++++++- .../core/protocol/openwire/amq/AMQSession.java | 15 +----- 3 files changed, 52 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f24d97bf/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index f666785..21b2d46 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -1294,7 +1294,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se referenceIterator.remove(); ref.incrementDeliveryCount(); consumer.backToDelivering(ref); - session.addRolledback(ref.getMessageID()); + final AMQConsumer amqConsumer = (AMQConsumer) consumer.getProtocolData(); + amqConsumer.addRolledback(ref); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f24d97bf/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 0b7eff5..7e9881b 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -17,8 +17,11 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq; import java.io.IOException; +import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -68,6 +71,7 @@ public class AMQConsumer { //internal means we don't expose //it's address/queue to management service private boolean internalAddress = false; + private volatile Set<MessageReference> rolledbackMessageRefs; public AMQConsumer(AMQSession amqSession, org.apache.activemq.command.ActiveMQDestination d, @@ -85,6 +89,30 @@ public class AMQConsumer { messagePullHandler = new MessagePullHandler(); } this.internalAddress = internalAddress; + this.rolledbackMessageRefs = null; + } + + private Set<MessageReference> guardedInitializationOfRolledBackMessageRefs() { + synchronized (this) { + Set<MessageReference> rollbackedMessageRefs = this.rolledbackMessageRefs; + if (rollbackedMessageRefs == null) { + rollbackedMessageRefs = new ConcurrentSkipListSet<>(Comparator.comparingLong(MessageReference::getMessageID)); + this.rolledbackMessageRefs = rollbackedMessageRefs; + } + return rollbackedMessageRefs; + } + } + + private Set<MessageReference> getRolledbackMessageRefsOrCreate() { + Set<MessageReference> rolledbackMessageRefs = this.rolledbackMessageRefs; + if (rolledbackMessageRefs == null) { + rolledbackMessageRefs = guardedInitializationOfRolledBackMessageRefs(); + } + return rolledbackMessageRefs; + } + + private Set<MessageReference> getRolledbackMessageRefs() { + return this.rolledbackMessageRefs; } public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception { @@ -353,7 +381,6 @@ public class AMQConsumer { } public boolean updateDeliveryCountAfterCancel(MessageReference ref) { - long seqId = ref.getMessageID(); long lastDelSeqId = info.getLastDeliveredSequenceId(); //in activemq5, closing a durable subscription won't close the consumer @@ -373,7 +400,7 @@ public class AMQConsumer { // tx cases are handled by // org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection.CommandProcessor.processRollbackTransaction() ref.incrementDeliveryCount(); - } else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNSET && !session.isRolledBack(seqId)) { + } else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNSET && !isRolledBack(ref)) { ref.incrementDeliveryCount(); } @@ -432,4 +459,24 @@ public class AMQConsumer { } } } + + public boolean removeRolledback(MessageReference messageReference) { + final Set<MessageReference> rolledbackMessageRefs = getRolledbackMessageRefs(); + if (rolledbackMessageRefs == null) { + return false; + } + return rolledbackMessageRefs.remove(messageReference); + } + + public void addRolledback(MessageReference messageReference) { + getRolledbackMessageRefsOrCreate().add(messageReference); + } + + private boolean isRolledBack(MessageReference messageReference) { + final Set<MessageReference> rollbackedMessageRefs = getRolledbackMessageRefs(); + if (rollbackedMessageRefs == null) { + return false; + } + return rollbackedMessageRefs.contains(messageReference); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f24d97bf/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 34e2c0f..0250f1c 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -20,7 +20,6 @@ import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUt import java.io.IOException; import java.util.List; -import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -53,7 +52,6 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; -import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; @@ -97,8 +95,6 @@ public class AMQSession implements SessionCallback { private final SimpleString clientId; - private final Set<Long> rollbackedIds = new ConcurrentHashSet<>(); - public AMQSession(ConnectionInfo connInfo, SessionInfo sessInfo, ActiveMQServer server, @@ -313,8 +309,7 @@ public class AMQSession implements SessionCallback { int deliveryCount) { AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData(); //clear up possible rolledback ids. - rollbackedIds.remove(message.getMessageID()); - // TODO: use encoders and proper conversions here + theConsumer.removeRolledback(reference); return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount); } @@ -548,12 +543,4 @@ public class AMQSession implements SessionCallback { public boolean isInternal() { return sessInfo.getSessionId().getValue() == -1; } - - public void addRolledback(long messageID) { - this.rollbackedIds.add(messageID); - } - - public boolean isRolledBack(long mid) { - return rollbackedIds.remove(mid); - } }
