This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new da4f95c ARTEMIS-2305 ACK counters to only increment after commit
new 3589dd7 This closes #2618
da4f95c is described below
commit da4f95cf714317d956ed3aed11dd39e2c4e39368
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Apr 2 17:17:23 2019 -0400
ARTEMIS-2305 ACK counters to only increment after commit
Also including a new metric for ack attempts that will keep the former
semantic.
---
.../apache/activemq/artemis/logs/AuditLogger.java | 10 +++
.../artemis/api/core/management/QueueControl.java | 7 ++
.../proton/transaction/ProtonTransactionImpl.java | 5 +-
.../ProtonTransactionRefsOperation.java | 5 +-
.../core/management/impl/QueueControlImpl.java | 15 ++++
.../apache/activemq/artemis/core/server/Queue.java | 4 +-
.../artemis/core/server/impl/QueueImpl.java | 59 ++++++++-------
.../artemis/core/server/impl/RefsOperation.java | 8 +-
.../artemis/core/transaction/Transaction.java | 3 +-
.../transaction/impl/BindingsTransactionImpl.java | 3 +-
.../core/transaction/impl/TransactionImpl.java | 5 +-
.../server/impl/ScheduledDeliveryHandlerTest.java | 7 +-
.../client/InterruptedLargeMessageTest.java | 3 +-
.../tests/integration/jmx/JmxConnectionTest.java | 2 +-
.../integration/management/QueueControlTest.java | 88 ++++++++++++++++++++++
.../management/QueueControlUsingCoreTest.java | 5 ++
.../core/postoffice/impl/BindingsImplTest.java | 3 +-
.../tests/unit/core/postoffice/impl/FakeQueue.java | 7 +-
18 files changed, 196 insertions(+), 43 deletions(-)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
index 5bf8bea..a07608e 100644
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
@@ -2252,4 +2252,14 @@ public interface AuditLogger extends BasicLogger {
@Message(id = 601500, value = "User {0} is sending a core message on target
resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void coreSendMessage(String user, Object source, Object... args);
+
+ static void getAcknowledgeAttempts(Object source) {
+ LOGGER.getMessagesAcknowledged(getCaller(), source);
+ }
+
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 601501, value = "User {0} is getting messages acknowledged
attemps on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
+ void getAcknowledgeAttempts(String user, Object source, Object... args);
+
+
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index b57be21..0d3ab0f 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -169,6 +169,13 @@ public interface QueueControl {
long getMessagesAcknowledged();
/**
+ * Returns the number of messages added to this queue since it was created.
+ */
+ @Attribute(desc = "number of messages acknowledged attempts from this queue
since it was created")
+ long getAcknowledgeAttempts();
+
+
+ /**
* Returns the number of messages expired from this queue since it was
created.
*/
@Attribute(desc = "number of messages expired from this queue since it was
created")
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
index 4c5a887..123dbb5 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.transaction.Transaction;
import
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
@@ -67,8 +68,8 @@ public class ProtonTransactionImpl extends TransactionImpl {
}
@Override
- public RefsOperation createRefsOperation(Queue queue) {
- return new ProtonTransactionRefsOperation(queue, storageManager);
+ public RefsOperation createRefsOperation(Queue queue, AckReason reason) {
+ return new ProtonTransactionRefsOperation(queue, reason, storageManager);
}
@Override
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionRefsOperation.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionRefsOperation.java
index 7b48ac0..4bb00d2 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionRefsOperation.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionRefsOperation.java
@@ -23,6 +23,7 @@ import
org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -36,8 +37,8 @@ import org.apache.qpid.proton.engine.Delivery;
*/
public class ProtonTransactionRefsOperation extends RefsOperation {
- public ProtonTransactionRefsOperation(final Queue queue, StorageManager
storageManager) {
- super(queue, storageManager);
+ public ProtonTransactionRefsOperation(final Queue queue, AckReason reason,
StorageManager storageManager) {
+ super(queue, reason, storageManager);
}
@Override
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 6270fe5..b789347 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -401,6 +401,21 @@ public class QueueControlImpl extends AbstractControl
implements QueueControl {
}
@Override
+ public long getAcknowledgeAttempts() {
+ if (AuditLogger.isEnabled()) {
+ AuditLogger.getMessagesAcknowledged(queue);
+ }
+ checkStarted();
+
+ clearIO();
+ try {
+ return queue.getAcknowledgeAttempts();
+ } finally {
+ blockOnIO();
+ }
+ }
+
+ @Override
public long getMessagesExpired() {
if (AuditLogger.isEnabled()) {
AuditLogger.getMessagesExpired(queue);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 2fcd766..b722b5f 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -234,6 +234,8 @@ public interface Queue extends Bindable,CriticalComponent {
long getMessagesAdded();
+ long getAcknowledgeAttempts();
+
long getMessagesAcknowledged();
long getMessagesExpired();
@@ -393,7 +395,7 @@ public interface Queue extends Bindable,CriticalComponent {
*/
void deliverScheduledMessages() throws ActiveMQException;
- void postAcknowledge(MessageReference ref);
+ void postAcknowledge(MessageReference ref, AckReason reason);
float getRate();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 9d04f5b..44f938e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -193,6 +193,8 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
private AtomicLong messagesAcknowledged = new AtomicLong(0);
+ private AtomicLong ackAttempts = new AtomicLong(0);
+
private AtomicLong messagesExpired = new AtomicLong(0);
private AtomicLong messagesKilled = new AtomicLong(0);
@@ -1473,7 +1475,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
} else {
if (ref.isPaged()) {
pageSubscription.ack((PagedReference) ref);
- postAcknowledge(ref);
+ postAcknowledge(ref, reason);
} else {
Message message = ref.getMessage();
@@ -1482,18 +1484,10 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
if (durableRef) {
storageManager.storeAcknowledge(id, message.getMessageID());
}
- postAcknowledge(ref);
+ postAcknowledge(ref, reason);
}
- if (reason == AckReason.EXPIRED) {
- messagesExpired.incrementAndGet();
- } else if (reason == AckReason.KILLED) {
- messagesKilled.incrementAndGet();
- } else if (reason == AckReason.REPLACED) {
- messagesReplaced.incrementAndGet();
- } else {
- messagesAcknowledged.incrementAndGet();
- }
+ ackAttempts.incrementAndGet();
if (server != null && server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin ->
plugin.messageAcknowledged(ref, reason, consumer));
@@ -1508,10 +1502,12 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
@Override
public void acknowledge(final Transaction tx, final MessageReference ref,
final AckReason reason, final ServerConsumer consumer) throws Exception {
+ RefsOperation refsOperation = getRefsOperation(tx, reason);
+
if (ref.isPaged()) {
pageSubscription.ackTx(tx, (PagedReference) ref);
- getRefsOperation(tx).addAck(ref);
+ refsOperation.addAck(ref);
} else {
Message message = ref.getMessage();
@@ -1523,15 +1519,9 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
tx.setContainsPersistent();
}
- getRefsOperation(tx).addAck(ref);
- }
+ ackAttempts.incrementAndGet();
- if (reason == AckReason.EXPIRED) {
- messagesExpired.incrementAndGet();
- } else if (reason == AckReason.KILLED) {
- messagesKilled.incrementAndGet();
- } else {
- messagesAcknowledged.incrementAndGet();
+ refsOperation.addAck(ref);
}
if (server != null && server.hasBrokerMessagePlugins()) {
@@ -1547,7 +1537,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
tx.setContainsPersistent();
}
- getRefsOperation(tx).addAck(ref);
+ getRefsOperation(tx, AckReason.NORMAL).addAck(ref);
// https://issues.jboss.org/browse/HORNETQ-609
incDelivering(ref);
@@ -1555,16 +1545,16 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
messagesAcknowledged.incrementAndGet();
}
- private RefsOperation getRefsOperation(final Transaction tx) {
- return getRefsOperation(tx, false);
+ private RefsOperation getRefsOperation(final Transaction tx, AckReason
ackReason) {
+ return getRefsOperation(tx, ackReason, false);
}
- private RefsOperation getRefsOperation(final Transaction tx, boolean
ignoreRedlieveryCheck) {
+ private RefsOperation getRefsOperation(final Transaction tx, AckReason
ackReason, boolean ignoreRedlieveryCheck) {
synchronized (tx) {
RefsOperation oper = (RefsOperation)
tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
if (oper == null) {
- oper = tx.createRefsOperation(this);
+ oper = tx.createRefsOperation(this, ackReason);
tx.putProperty(TransactionPropertyIndexes.REFS_OPERATION, oper);
@@ -1586,7 +1576,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
@Override
public void cancel(final Transaction tx, final MessageReference reference,
boolean ignoreRedeliveryCheck) {
- getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference);
+ getRefsOperation(tx, AckReason.NORMAL,
ignoreRedeliveryCheck).addAck(reference);
}
@Override
@@ -1706,6 +1696,11 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
@Override
+ public long getAcknowledgeAttempts() {
+ return ackAttempts.get();
+ }
+
+ @Override
public long getMessagesExpired() {
return messagesExpired.get();
}
@@ -3300,11 +3295,21 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
@Override
- public void postAcknowledge(final MessageReference ref) {
+ public void postAcknowledge(final MessageReference ref, AckReason reason) {
QueueImpl queue = (QueueImpl) ref.getQueue();
queue.decDelivering(ref);
+ if (reason == AckReason.EXPIRED) {
+ messagesExpired.incrementAndGet();
+ } else if (reason == AckReason.KILLED) {
+ messagesKilled.incrementAndGet();
+ } else if (reason == AckReason.REPLACED) {
+ messagesReplaced.incrementAndGet();
+ } else {
+ messagesAcknowledged.incrementAndGet();
+ }
+
if (ref.isPaged()) {
// nothing to be done
return;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 6d21be2..de52cc4 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -38,6 +38,8 @@ public class RefsOperation extends
TransactionOperationAbstract {
private static final Logger logger = Logger.getLogger(RefsOperation.class);
+ private final AckReason reason;
+
private final StorageManager storageManager;
private Queue queue;
List<MessageReference> refsToAck = new ArrayList<>();
@@ -50,11 +52,13 @@ public class RefsOperation extends
TransactionOperationAbstract {
*/
protected boolean ignoreRedeliveryCheck = false;
- public RefsOperation(Queue queue, StorageManager storageManager) {
+ public RefsOperation(Queue queue, AckReason reason, StorageManager
storageManager) {
this.queue = queue;
+ this.reason = reason;
this.storageManager = storageManager;
}
+
// once turned on, we shouldn't turn it off, that's why no parameters
public void setIgnoreRedeliveryCheck() {
ignoreRedeliveryCheck = true;
@@ -163,7 +167,7 @@ public class RefsOperation extends
TransactionOperationAbstract {
public void afterCommit(final Transaction tx) {
for (MessageReference ref : refsToAck) {
synchronized (ref.getQueue()) {
- queue.postAcknowledge(ref);
+ queue.postAcknowledge(ref, reason);
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
index 0ddc2cb..6fa2c5f 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
/**
@@ -95,5 +96,5 @@ public interface Transaction {
void setTimeout(int timeout);
- RefsOperation createRefsOperation(Queue queue);
+ RefsOperation createRefsOperation(Queue queue, AckReason reason);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/BindingsTransactionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/BindingsTransactionImpl.java
index 50dff64..5bb1acc 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/BindingsTransactionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/BindingsTransactionImpl.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.transaction.impl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
public class BindingsTransactionImpl extends TransactionImpl {
@@ -45,7 +46,7 @@ public class BindingsTransactionImpl extends TransactionImpl {
}
@Override
- public RefsOperation createRefsOperation(Queue queue) {
+ public RefsOperation createRefsOperation(Queue queue, AckReason reason) {
return null;
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index 9ef1554..d459975 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
@@ -162,8 +163,8 @@ public class TransactionImpl implements Transaction {
}
@Override
- public RefsOperation createRefsOperation(Queue queue) {
- return new RefsOperation(queue, storageManager);
+ public RefsOperation createRefsOperation(Queue queue, AckReason reason) {
+ return new RefsOperation(queue, reason, storageManager);
}
@Override
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 40ab28e..d241786 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -794,6 +794,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public long getAcknowledgeAttempts() {
+ return 0;
+ }
+
+ @Override
public boolean allowsReferenceCallback() {
return false;
}
@@ -1477,7 +1482,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public void postAcknowledge(MessageReference ref) {
+ public void postAcknowledge(MessageReference ref, AckReason reason) {
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
index 6056fcb..1218852 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
@@ -52,6 +52,7 @@ import
org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -523,7 +524,7 @@ public class InterruptedLargeMessageTest extends
LargeMessageTestBase {
}
@Override
- public void postAcknowledge(final MessageReference ref) {
+ public void postAcknowledge(final MessageReference ref, AckReason
reason) {
System.out.println("Ignoring postACK on message " + ref);
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jmx/JmxConnectionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jmx/JmxConnectionTest.java
index 9fd2c3a..416dfcc 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jmx/JmxConnectionTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jmx/JmxConnectionTest.java
@@ -105,7 +105,7 @@ public class JmxConnectionTest extends ActiveMQTestBase {
logAndSystemOut("Successfully connected to: " + urlString);
} catch (Exception e) {
logAndSystemOut("JMX connection failed: " + urlString, e);
- Assert.fail();
+ Assert.fail(e.getMessage());
return;
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 4dd95db..4fa03b2 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -36,6 +36,7 @@ import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.management.Notification;
import javax.management.openmbean.CompositeData;
+import javax.transaction.xa.XAResource;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -66,6 +67,7 @@ import
org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.junit.Wait;
import
org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
import org.apache.activemq.artemis.utils.Base64;
@@ -371,6 +373,92 @@ public class QueueControlTest extends ManagementTestBase {
}
@Test
+ public void testGetMessagesAcknowledgedOnXARollback() throws Exception {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, RoutingType.MULTICAST, queue, null,
durable);
+
+ QueueControl queueControl = createManagementControl(address, queue);
+ Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
+
+ ClientProducer producer = session.createProducer(address);
+ producer.send(session.createMessage(durable));
+
+ ClientSessionFactory xaFactory = createSessionFactory(locator);
+ ClientSession xaSession = addClientSession(xaFactory.createSession(true,
false, false));
+ xaSession.start();
+
+ ClientConsumer consumer = xaSession.createConsumer(queue);
+
+ int tries = 10;
+ for (int i = 0; i < tries; i++) {
+ XidImpl xid = newXID();
+ xaSession.start(xid, XAResource.TMNOFLAGS);
+ ClientMessage message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ message.acknowledge();
+ Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
+ xaSession.end(xid, XAResource.TMSUCCESS);
+ Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
+ xaSession.prepare(xid);
+ Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
+ if (i + 1 == tries) {
+ xaSession.commit(xid, false);
+ } else {
+ xaSession.rollback(xid);
+ }
+ }
+
+ Wait.assertEquals(1, queueControl::getMessagesAcknowledged);
+ Wait.assertEquals(10, queueControl::getAcknowledgeAttempts);
+
+ consumer.close();
+
+ session.deleteQueue(queue);
+ }
+
+ @Test
+ public void testGetMessagesAcknowledgedOnRegularRollback() throws Exception
{
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, RoutingType.MULTICAST, queue, null,
durable);
+
+ QueueControl queueControl = createManagementControl(address, queue);
+ Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
+
+ ClientProducer producer = session.createProducer(address);
+ producer.send(session.createMessage(durable));
+
+ ClientSessionFactory xaFactory = createSessionFactory(locator);
+ ClientSession txSession =
addClientSession(xaFactory.createSession(false, false, false));
+ txSession.start();
+
+ ClientConsumer consumer = txSession.createConsumer(queue);
+
+ int tries = 10;
+ for (int i = 0; i < tries; i++) {
+ ClientMessage message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ message.acknowledge();
+ Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
+ if (i + 1 == tries) {
+ txSession.commit();
+ } else {
+ txSession.rollback();
+ }
+ }
+
+ Wait.assertEquals(1, queueControl::getMessagesAcknowledged);
+ Wait.assertEquals(10, queueControl::getAcknowledgeAttempts);
+
+ consumer.close();
+
+ session.deleteQueue(queue);
+ }
+
+ @Test
public void testGetScheduledCount() throws Exception {
long delay = 500;
SimpleString address = RandomUtil.randomSimpleString();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index c40f655..254a287 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -217,6 +217,11 @@ public class QueueControlUsingCoreTest extends
QueueControlTest {
}
@Override
+ public long getAcknowledgeAttempts() {
+ return (Integer)
proxy.retrieveAttributeValue("acknowledgeAttempts", Integer.class);
+ }
+
+ @Override
public long getMessagesExpired() {
return (Long) proxy.retrieveAttributeValue("messagesExpired",
Long.class);
}
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index 01a568b..591f54a 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -35,6 +35,7 @@ import
org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -251,7 +252,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
}
@Override
- public RefsOperation createRefsOperation(Queue queue) {
+ public RefsOperation createRefsOperation(Queue queue, AckReason reason) {
// TODO Auto-generated method stub
return null;
}
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 0aa631b..9d43b26 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -207,6 +207,11 @@ public class FakeQueue extends CriticalComponentImpl
implements Queue {
}
@Override
+ public long getAcknowledgeAttempts() {
+ return 0;
+ }
+
+ @Override
public void cancel(Transaction tx, MessageReference ref, boolean
ignoreRedeliveryCheck) {
// no-op
}
@@ -841,7 +846,7 @@ public class FakeQueue extends CriticalComponentImpl
implements Queue {
}
@Override
- public void postAcknowledge(MessageReference ref) {
+ public void postAcknowledge(MessageReference ref, AckReason reason) {
}
@Override