This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new fc64026 ARTEMIS-2664 Simplify the credits acquiring.
new 89634e3 This closes #3029
fc64026 is described below
commit fc6402613d19b4f92fba89f54c9a47ed9d310458
Author: brusdev <[email protected]>
AuthorDate: Thu Mar 19 12:11:28 2020 +0100
ARTEMIS-2664 Simplify the credits acquiring.
Replace the AtomicInteger with an int. Indeed deliveredAcks is used only by
the acknowledge method and it is only executed by the EpollEventLoop thread
bounded with the relative connection channel.
---
.../core/protocol/openwire/amq/AMQConsumer.java | 19 ++++++++-----------
1 file changed, 8 insertions(+), 11 deletions(-)
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index eb4ce40..cb9c74b 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -68,7 +68,7 @@ public class AMQConsumer {
private int prefetchSize;
private final AtomicInteger currentWindow;
- private final AtomicInteger deliveredAcks;
+ private int deliveredAcks;
private long messagePullSequence = 0;
private final AtomicReference<MessagePullHandler> messagePullHandler = new
AtomicReference<>(null);
//internal means we don't expose
@@ -88,7 +88,7 @@ public class AMQConsumer {
this.scheduledPool = scheduledPool;
this.prefetchSize = info.getPrefetchSize();
this.currentWindow = new AtomicInteger(prefetchSize);
- this.deliveredAcks = new AtomicInteger(0);
+ this.deliveredAcks = 0;
if (prefetchSize == 0) {
messagePullHandler.set(new MessagePullHandler());
}
@@ -300,18 +300,15 @@ public class AMQConsumer {
List<MessageReference> ackList =
serverConsumer.getDeliveringReferencesBasedOnProtocol(removeReferences, first,
last);
if (removeReferences && (ack.isIndividualAck() || ack.isStandardAck() ||
ack.isPoisonAck())) {
- this.deliveredAcks.getAndUpdate(deliveredAcks -> {
- if (deliveredAcks >= ackList.size()) {
- return deliveredAcks - ackList.size();
- }
-
+ if (deliveredAcks < ackList.size()) {
acquireCredit(ackList.size() - deliveredAcks);
-
- return 0;
- });
+ deliveredAcks = 0;
+ } else {
+ deliveredAcks -= ackList.size();
+ }
} else {
if (ack.isDeliveredAck()) {
- this.deliveredAcks.addAndGet(ack.getMessageCount());
+ this.deliveredAcks += ack.getMessageCount();
}
acquireCredit(ack.getMessageCount());