Repository: activemq-artemis Updated Branches: refs/heads/master ec605e664 -> a242f2761
ARTEMIS-646 track expired msg count on queue Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/32abe618 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/32abe618 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/32abe618 Branch: refs/heads/master Commit: 32abe61876d388b6d1208de8f74968d86be24872 Parents: ec605e6 Author: jbertram <[email protected]> Authored: Tue Jul 26 16:06:01 2016 -0500 Committer: jbertram <[email protected]> Committed: Tue Jul 26 16:15:02 2016 -0500 ---------------------------------------------------------------------- .../api/core/management/QueueControl.java | 12 +++++ .../api/jms/management/JMSQueueControl.java | 6 +++ .../management/impl/JMSQueueControlImpl.java | 5 ++ .../core/management/impl/QueueControlImpl.java | 27 +++++++++++ .../activemq/artemis/core/server/Queue.java | 4 ++ .../artemis/core/server/impl/QueueImpl.java | 51 +++++++++++++++++--- .../impl/ScheduledDeliveryHandlerTest.java | 10 ++++ .../management/JMSQueueControlUsingJMSTest.java | 5 ++ .../management/QueueControlTest.java | 40 +++++++++++++++ .../management/QueueControlUsingCoreTest.java | 10 ++++ .../unit/core/postoffice/impl/FakeQueue.java | 12 +++++ 11 files changed, 174 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index 3a3b349..3492892 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -99,6 +99,12 @@ public interface QueueControl { long getMessagesAcknowledged(); /** + * Returns the number of messages expired from this queue since it was created. + */ + @Attribute(desc = "number of messages expired from this queue since it was created") + long getMessagesExpired(); + + /** * Returns the first message on the queue as JSON */ @Attribute(desc = "first message on the queue as JSON") @@ -435,6 +441,12 @@ public interface QueueControl { void resetMessagesAcknowledged() throws Exception; /** + * Resets the MessagesExpired property + */ + @Operation(desc = "Resets the MessagesExpired property", impact = MBeanOperationInfo.ACTION) + void resetMessagesExpired() throws Exception; + + /** * it will flush one cycle on internal executors, so you would be sure that any pending tasks are done before you call * any other measure. * It is useful if you need the exact number of counts on a message http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java index 837ec68..c13e3b9 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java @@ -56,6 +56,12 @@ public interface JMSQueueControl extends DestinationControl { int getConsumerCount(); /** + * Returns the number of messages expired from this queue since it was created. + */ + @Attribute(desc = "the number of messages expired from this queue since it was created") + long getMessagesExpired(); + + /** * returns the selector for the queue */ @Attribute(desc = "selector for the queue") http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java index 0516182..b037d72 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java @@ -125,6 +125,11 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro } @Override + public long getMessagesExpired() { + return coreQueueControl.getMessagesExpired(); + } + + @Override public int getConsumerCount() { return coreQueueControl.getConsumerCount(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index de2459f..8f1d6e6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -257,6 +257,19 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } @Override + public long getMessagesExpired() { + checkStarted(); + + clearIO(); + try { + return queue.getMessagesExpired(); + } + finally { + blockOnIO(); + } + } + + @Override public long getID() { checkStarted(); @@ -1011,6 +1024,20 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } + @Override + public void resetMessagesExpired() throws Exception { + checkStarted(); + + clearIO(); + try { + queue.resetMessagesExpired(); + } + finally { + blockOnIO(); + } + + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index fa39c22..6645d36 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -121,6 +121,8 @@ public interface Queue extends Bindable { long getMessagesAcknowledged(); + long getMessagesExpired(); + MessageReference removeReferenceWithID(long id) throws Exception; MessageReference getReference(long id) throws ActiveMQException; @@ -234,6 +236,8 @@ public interface Queue extends Bindable { void resetMessagesAcknowledged(); + void resetMessagesExpired(); + void incrementMesssagesAdded(); /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index c809abf..724da5b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -167,6 +167,8 @@ public class QueueImpl implements Queue { private long messagesAcknowledged; + private long messagesExpired; + protected final AtomicInteger deliveringCount = new AtomicInteger(0); private boolean paused; @@ -962,6 +964,10 @@ public class QueueImpl implements Queue { @Override public void acknowledge(final MessageReference ref) throws Exception { + acknowledge(ref, OperationType.NORMAL); + } + + private void acknowledge(final MessageReference ref, OperationType type) throws Exception { if (ref.isPaged()) { pageSubscription.ack((PagedReference) ref); postAcknowledge(ref); @@ -977,12 +983,21 @@ public class QueueImpl implements Queue { postAcknowledge(ref); } - messagesAcknowledged++; + if (type == OperationType.EXPIRED) { + messagesExpired++; + } + else { + messagesAcknowledged++; + } } @Override public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception { + acknowledge(tx, ref, OperationType.NORMAL); + } + + private void acknowledge(final Transaction tx, final MessageReference ref, OperationType type) throws Exception { if (ref.isPaged()) { pageSubscription.ackTx(tx, (PagedReference) ref); @@ -1002,7 +1017,12 @@ public class QueueImpl implements Queue { getRefsOperation(tx).addAck(ref); } - messagesAcknowledged++; + if (type == OperationType.EXPIRED) { + messagesExpired++; + } + else { + messagesAcknowledged++; + } } @Override @@ -1075,13 +1095,13 @@ public class QueueImpl implements Queue { if (logger.isTraceEnabled()) { logger.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName()); } - move(null, expiryAddress, ref, true, false); + move(null, expiryAddress, ref, true, false, OperationType.EXPIRED); } else { if (logger.isTraceEnabled()) { logger.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName()); } - acknowledge(ref); + acknowledge(ref, OperationType.EXPIRED); } } @@ -1128,6 +1148,11 @@ public class QueueImpl implements Queue { } @Override + public long getMessagesExpired() { + return messagesExpired; + } + + @Override public int deleteAllReferences() throws Exception { return deleteAllReferences(DEFAULT_FLUSH_LIMIT); } @@ -1508,7 +1533,7 @@ public class QueueImpl implements Queue { refRemoved(ref); incDelivering(); try { - move(null, toAddress, ref, false, rejectDuplicate); + move(null, toAddress, ref, false, rejectDuplicate, OperationType.NORMAL); } catch (Exception e) { decDelivering(); @@ -2353,7 +2378,7 @@ public class QueueImpl implements Queue { } else { ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name); - move(tx, deadLetterAddress, ref, false, false); + move(tx, deadLetterAddress, ref, false, false, OperationType.NORMAL); } } else { @@ -2367,7 +2392,8 @@ public class QueueImpl implements Queue { final SimpleString address, final MessageReference ref, final boolean expiry, - final boolean rejectDuplicate) throws Exception { + final boolean rejectDuplicate, + final OperationType type) throws Exception { Transaction tx; if (originalTX != null) { @@ -2384,7 +2410,7 @@ public class QueueImpl implements Queue { postOffice.route(copyMessage, null, tx, false, rejectDuplicate); - acknowledge(tx, ref); + acknowledge(tx, ref, type); if (originalTX == null) { tx.commit(); @@ -2634,6 +2660,11 @@ public class QueueImpl implements Queue { } @Override + public synchronized void resetMessagesExpired() { + messagesExpired = 0; + } + + @Override public float getRate() { float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f); if (timeSlice == 0) { @@ -2988,5 +3019,9 @@ public class QueueImpl implements Queue { } } } + + private enum OperationType { + EXPIRED, NORMAL + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index c96606d..301833e 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1047,6 +1047,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override + public long getMessagesExpired() { + return 0; + } + + @Override public MessageReference removeReferenceWithID(long id) throws Exception { return null; } @@ -1256,6 +1261,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override + public void resetMessagesExpired() { + + } + + @Override public void incrementMesssagesAdded() { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java index 2a966cf..8c14ff2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java @@ -120,6 +120,11 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest { } @Override + public long getMessagesExpired() { + return ((Number) proxy.retrieveAttributeValue("getMessagesExpired")).longValue(); + } + + @Override public String getDeadLetterAddress() { return (String) proxy.retrieveAttributeValue("deadLetterAddress"); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 0073c8d..693b556 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -1984,6 +1984,46 @@ public class QueueControlTest extends ManagementTestBase { session.deleteQueue(queue); } + @Test + public void testResetMessagesExpired() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(address, queue, null, false); + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(0, queueControl.getMessagesExpired()); + + ClientProducer producer = session.createProducer(address); + ClientMessage message = session.createMessage(false); + producer.send(message); + + // the message IDs are set on the server + Map<String, Object>[] messages = queueControl.listMessages(null); + Assert.assertEquals(1, messages.length); + long messageID = (Long) messages[0].get("messageID"); + + queueControl.expireMessage(messageID); + Assert.assertEquals(1, queueControl.getMessagesExpired()); + + message = session.createMessage(false); + producer.send(message); + + // the message IDs are set on the server + messages = queueControl.listMessages(null); + Assert.assertEquals(1, messages.length); + messageID = (Long) messages[0].get("messageID"); + + queueControl.expireMessage(messageID); + Assert.assertEquals(2, queueControl.getMessagesExpired()); + + queueControl.resetMessagesExpired(); + + Assert.assertEquals(0, queueControl.getMessagesExpired()); + + session.deleteQueue(queue); + } + //make sure notifications are always received no matter whether //a Queue is created via QueueControl or by JMSServerManager directly. @Test http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index 68dfd48..f27eaf1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -121,6 +121,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest { } @Override + public long getMessagesExpired() { + return ((Number) proxy.retrieveAttributeValue("messagesExpired")).longValue(); + } + + @Override public void resetMessagesAdded() throws Exception { proxy.invokeOperation("resetMessagesAdded"); } @@ -131,6 +136,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest { } @Override + public void resetMessagesExpired() throws Exception { + proxy.invokeOperation("resetMessagesExpired"); + } + + @Override public String getName() { return (String) proxy.retrieveAttributeValue("name"); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 250d211..0633bfb 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -312,6 +312,12 @@ public class FakeQueue implements Queue { } @Override + public long getMessagesExpired() { + // no-op + return 0; + } + + @Override public void resetMessagesAdded() { // no-op @@ -324,6 +330,12 @@ public class FakeQueue implements Queue { } @Override + public void resetMessagesExpired() { + // no-op + + } + + @Override public void incrementMesssagesAdded() { }
