https://issues.apache.org/jira/browse/AMQ-5289 - add forwardCount to destinationStatistics - allow local consumption to be accounted with dequeueCount - forwardCount so forwarded messages are not accounted for num hops times
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e4183ec4 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e4183ec4 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e4183ec4 Branch: refs/heads/activemq-5.10.x Commit: e4183ec48d51efb086ca4d9ce1cbc458931f98c6 Parents: 9146785 Author: gtully <[email protected]> Authored: Fri Jul 25 11:46:36 2014 +0100 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Dec 17 14:56:21 2014 -0500 ---------------------------------------------------------------------- .../apache/activemq/broker/jmx/DestinationView.java | 5 +++++ .../activemq/broker/jmx/DestinationViewMBean.java | 10 ++++++++++ .../broker/region/DestinationStatistics.java | 10 ++++++++++ .../org/apache/activemq/broker/region/Queue.java | 3 +++ .../activemq/broker/region/TopicSubscription.java | 3 +++ .../activemq/console/command/DstatCommand.java | 9 ++++++--- .../org/apache/activemq/broker/jmx/MBeanTest.java | 1 + .../activemq/network/DemandForwardingBridgeTest.java | 15 +++++++++++++++ .../usecases/ThreeBrokerTopicNetworkTest.java | 4 ++++ 9 files changed, 57 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 3f62943..ec6fe7c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -90,6 +90,11 @@ public class DestinationView implements DestinationViewMBean { } @Override + public long getForwardCount() { + return destination.getDestinationStatistics().getForwards().getCount(); + } + + @Override public long getDispatchCount() { return destination.getDestinationStatistics().getDispatched().getCount(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index f83d47e..a42bcfa 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -70,6 +70,16 @@ public interface DestinationViewMBean { long getDequeueCount(); /** + * Returns the number of messages that have been acknowledged by network subscriptions from the + * destination. + * + * @return The number of messages that have been acknowledged by network subscriptions from the + * destination. + */ + @MBeanInfo("Number of messages that have been forwarded (to a networked broker) from the destination.") + long getForwardCount(); + + /** * Returns the number of messages that have been dispatched but not * acknowledged * http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java index ee2b478..0a9176e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java @@ -31,6 +31,7 @@ public class DestinationStatistics extends StatsImpl { protected CountStatisticImpl enqueues; protected CountStatisticImpl dequeues; + protected CountStatisticImpl forwards; protected CountStatisticImpl consumers; protected CountStatisticImpl producers; protected CountStatisticImpl messages; @@ -49,6 +50,7 @@ public class DestinationStatistics extends StatsImpl { enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination"); dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the destination"); dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the destination"); + forwards = new CountStatisticImpl("forwards", "The number of messages that have been forwarded to a networked broker from the destination"); inflight = new CountStatisticImpl("inflight", "The number of messages dispatched but awaiting acknowledgement"); expired = new CountStatisticImpl("expired", "The number of messages that have expired"); @@ -86,6 +88,10 @@ public class DestinationStatistics extends StatsImpl { return dequeues; } + public CountStatisticImpl getForwards() { + return forwards; + } + public CountStatisticImpl getInflight() { return inflight; } @@ -137,6 +143,7 @@ public class DestinationStatistics extends StatsImpl { super.reset(); enqueues.reset(); dequeues.reset(); + forwards.reset(); dispatched.reset(); inflight.reset(); expired.reset(); @@ -151,6 +158,7 @@ public class DestinationStatistics extends StatsImpl { enqueues.setEnabled(enabled); dispatched.setEnabled(enabled); dequeues.setEnabled(enabled); + forwards.setEnabled(enabled); inflight.setEnabled(enabled); expired.setEnabled(true); consumers.setEnabled(enabled); @@ -169,6 +177,7 @@ public class DestinationStatistics extends StatsImpl { enqueues.setParent(parent.enqueues); dispatched.setParent(parent.dispatched); dequeues.setParent(parent.dequeues); + forwards.setParent(parent.forwards); inflight.setParent(parent.inflight); expired.setParent(parent.expired); consumers.setParent(parent.consumers); @@ -183,6 +192,7 @@ public class DestinationStatistics extends StatsImpl { enqueues.setParent(null); dispatched.setParent(null); dequeues.setParent(null); + forwards.setParent(null); inflight.setParent(null); expired.setParent(null); consumers.setParent(null); http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 06c74db..647ba68 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1810,6 +1810,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { } finally { messagesLock.writeLock().unlock(); } + if (sub != null && sub.getConsumerInfo().isNetworkSubscription()) { + getDestinationStatistics().getForwards().increment(); + } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index d17fb2f..6b61379 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -285,6 +285,9 @@ public class TopicSubscription extends AbstractSubscription { if (singleDestination && destination != null) { destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); + if (info.isNetworkSubscription()) { + destination.getDestinationStatistics().getForwards().add(ack.getMessageCount()); + } } dequeueCounter.addAndGet(ack.getMessageCount()); } http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java ---------------------------------------------------------------------- diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java index a6d6356..8a41d6a 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/DstatCommand.java @@ -95,7 +95,7 @@ public class DstatCommand extends AbstractJmxCommand { // sort list so the names is A..Z Collections.sort(queueList, new ObjectInstanceComparator()); - context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Memory %")); + context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %")); // Iterate through the queue result for (Object view : queueList) { @@ -115,6 +115,7 @@ public class DstatCommand extends AbstractJmxCommand { queueView.getConsumerCount(), queueView.getEnqueueCount(), queueView.getDequeueCount(), + queueView.getForwardCount(), queueView.getMemoryPercentUsage())); } } @@ -128,7 +129,7 @@ public class DstatCommand extends AbstractJmxCommand { final String header = "%-50s %10s %10s %10s %10s %10s %10s"; final String tableRow = "%-50s %10d %10d %10d %10d %10d %10d"; - context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Memory %")); + context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %")); Collections.sort(queueList, new ObjectInstanceComparator()); @@ -150,6 +151,7 @@ public class DstatCommand extends AbstractJmxCommand { queueView.getConsumerCount(), queueView.getEnqueueCount(), queueView.getDequeueCount(), + queueView.getForwardCount(), queueView.getMemoryPercentUsage())); } } @@ -166,7 +168,7 @@ public class DstatCommand extends AbstractJmxCommand { // sort list so the names is A..Z Collections.sort(topicsList, new ObjectInstanceComparator()); - context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Memory %")); + context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %")); // Iterate through the topics result for (Object view : topicsList) { @@ -186,6 +188,7 @@ public class DstatCommand extends AbstractJmxCommand { topicView.getConsumerCount(), topicView.getEnqueueCount(), topicView.getDequeueCount(), + topicView.getForwardCount(), topicView.getMemoryPercentUsage())); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index a853b3e..30487c8 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -169,6 +169,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage()); assertTrue("use cache", queueNew.isUseCache()); assertTrue("cache enabled", queueNew.isCacheEnabled()); + assertEquals("no forwards", 0, queueNew.getForwardCount()); } public void testRemoveMessages() throws Exception { http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java index 1491ba2..020a511 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java @@ -21,6 +21,7 @@ import javax.jms.DeliveryMode; import junit.framework.Test; import org.apache.activemq.broker.StubConnection; +import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; @@ -72,6 +73,11 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport { // Close consumer to cause the message to rollback. connection1.send(consumerInfo1.createRemoveCommand()); + final DestinationStatistics destinationStatistics = broker.getDestination(destination).getDestinationStatistics(); + assertEquals("broker dest stat dispatched", 1, destinationStatistics.getDispatched().getCount()); + assertEquals("broker dest stat dequeues", 0, destinationStatistics.getDequeues().getCount()); + assertEquals("broker dest stat forwards", 0, destinationStatistics.getForwards().getCount()); + // Now create remote consumer that should cause message to move to this // remote consumer. ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); @@ -84,6 +90,15 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport { return receiveMessage(connection2) != null; } })); + + assertTrue("broker dest stat forwards", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 1 == destinationStatistics.getForwards().getCount(); + } + })); + assertEquals("broker dest stat dequeues", 1, destinationStatistics.getDequeues().getCount()); + assertEquals("remote broker dest stat dequeues", 1, remoteBroker.getDestination(destination).getDestinationStatistics().getDequeues().getCount()); } public void initCombosForTestAddConsumerThenSend() { http://git-wip-us.apache.org/repos/asf/activemq/blob/e4183ec4/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java index 33963b7..99deb28 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java @@ -31,6 +31,7 @@ import org.apache.activemq.JmsMultipleBrokersTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.util.MessageIdList; /** @@ -77,6 +78,9 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport { assertEquals(MESSAGE_COUNT, msgsA.getMessageCount()); assertEquals(MESSAGE_COUNT * 2, msgsB.getMessageCount()); assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount()); + + assertEquals("Correct forwards from A", MESSAGE_COUNT, + brokers.get("BrokerA").broker.getDestination(ActiveMQDestination.transform(dest)).getDestinationStatistics().getForwards().getCount()); } public void initCombosForTestABandBCbrokerNetworkWithSelectors() {
