Fix for https://issues.apache.org/jira/browse/AMQ-4916
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f6ed548b Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f6ed548b Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f6ed548b Branch: refs/heads/activemq-5.9 Commit: f6ed548b82d4c4472b378871d646d27e9f519d0d Parents: 2e5c9d5 Author: Rob Davies <[email protected]> Authored: Tue Dec 3 17:15:02 2013 +0000 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Mar 12 13:13:02 2014 -0400 ---------------------------------------------------------------------- .../org/apache/activemq/broker/jmx/ProducerView.java | 12 ++++++++++++ .../apache/activemq/broker/jmx/ProducerViewMBean.java | 9 ++++++++- .../apache/activemq/broker/jmx/SubscriptionView.java | 14 ++++++++++++++ .../activemq/broker/jmx/SubscriptionViewMBean.java | 7 +++++++ .../apache/activemq/broker/region/AbstractRegion.java | 3 +++ .../activemq/broker/region/AbstractSubscription.java | 7 +++++++ .../apache/activemq/broker/region/Subscription.java | 3 +++ .../org/apache/activemq/command/ProducerInfo.java | 6 ++++++ .../broker/region/QueueDuplicatesFromStoreTest.java | 6 ++++++ 9 files changed, 66 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/f6ed548b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java index 1596d5e..6905c72 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java @@ -184,4 +184,16 @@ public class ProducerView implements ProducerViewMBean { producerBrokerExchange.resetFlowControl(); } } + + @Override + public void resetStatistics() { + if (info != null){ + info.getSentCount().reset(); + } + } + + @Override + public long getSentCount() { + return info != null ? info.getSentCount().getCount() :0; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/f6ed548b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java index da357c1..14c2073 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java @@ -96,6 +96,13 @@ public interface ProducerViewMBean { @MBeanInfo("percentage of sends Producer Blocked for Flow Control") int getPercentageBlocked(); - @MBeanInfo("reset flow control stata") + @MBeanInfo("reset flow control state") void resetFlowControlStats(); + + @MBeanInfo("Resets statistics.") + void resetStatistics(); + + @MBeanInfo("Messages consumed") + long getSentCount(); + } http://git-wip-us.apache.org/repos/asf/activemq/blob/f6ed548b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java index 8201737..443a266 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java @@ -88,6 +88,8 @@ public class SubscriptionView implements SubscriptionViewMBean { return result; } + + private ObjectName createConnectionQuery(ManagementContext ctx, String brokerName) throws IOException { try { return BrokerMBeanSupport.createConnectionQuery(ctx.getJmxDomainName(), brokerName, clientId); @@ -415,4 +417,16 @@ public class SubscriptionView implements SubscriptionViewMBean { public String getUserName() { return userName; } + + @Override + public void resetStatistics() { + if (subscription != null){ + subscription.getConsumedCount().reset(); + } + } + + @Override + public long getConsumedCount() { + return subscription != null ? subscription.getConsumedCount().getCount() : 0; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/f6ed548b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java index 9bbedc1..3c3aab3 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionViewMBean.java @@ -242,4 +242,11 @@ public interface SubscriptionViewMBean { @MBeanInfo("ObjectName of the Connection that created this Subscription") ObjectName getConnection(); + + @MBeanInfo("Resets statistics.") + void resetStatistics(); + + @MBeanInfo("Messages consumed") + long getConsumedCount(); + } http://git-wip-us.apache.org/repos/asf/activemq/blob/f6ed548b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 168bd96..16deed4 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -392,6 +392,9 @@ public abstract class AbstractRegion implements Region { } producerExchange.getRegionDestination().send(producerExchange, messageSend); + if (producerExchange.getProducerState() != null && producerExchange.getProducerState().getInfo() != null){ + producerExchange.getProducerState().getInfo().getSentCount().increment(); + } } public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq/blob/f6ed548b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index 06a44bf..b2ff01c 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -36,6 +36,7 @@ import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.LogicExpression; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.NoLocalExpression; +import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.selector.SelectorParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +54,7 @@ public abstract class AbstractSubscription implements Subscription { private int cursorMemoryHighWaterMark = 70; private boolean slowConsumer; private long lastAckTime; + private CountStatisticImpl consumedCount = new CountStatisticImpl("consumed","The number of messages consumed"); public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { this.broker = broker; @@ -88,6 +90,7 @@ public abstract class AbstractSubscription implements Subscription { @Override public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { this.lastAckTime = System.currentTimeMillis(); + this.consumedCount.increment(); } @Override @@ -276,4 +279,8 @@ public abstract class AbstractSubscription implements Subscription { public void setTimeOfLastMessageAck(long value) { this.lastAckTime = value; } + + public CountStatisticImpl getConsumedCount(){ + return consumedCount; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/f6ed548b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java index dfd427d..b79b37e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java @@ -30,6 +30,7 @@ import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.Response; import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.management.CountStatisticImpl; /** * @@ -234,4 +235,6 @@ public interface Subscription extends SubscriptionRecovery { */ long getTimeOfLastMessageAck(); + CountStatisticImpl getConsumedCount(); + } http://git-wip-us.apache.org/repos/asf/activemq/blob/f6ed548b/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java index 9854c5e..05ef3a4 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.command; +import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.state.CommandVisitor; /** @@ -32,6 +33,7 @@ public class ProducerInfo extends BaseCommand { protected BrokerId[] brokerPath; protected boolean dispatchAsync; protected int windowSize; + protected CountStatisticImpl sentCount = new CountStatisticImpl("sentCount","number of messages sent to a broker"); public ProducerInfo() { } @@ -135,4 +137,8 @@ public class ProducerInfo extends BaseCommand { this.windowSize = windowSize; } + public CountStatisticImpl getSentCount(){ + return sentCount; + } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/f6ed548b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java index 6fe28fa..9da839d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java @@ -44,6 +44,7 @@ import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.Response; import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.state.ProducerState; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; @@ -339,6 +340,11 @@ public class QueueDuplicatesFromStoreTest extends TestCase { public long getTimeOfLastMessageAck() { return 0; } + + @Override + public CountStatisticImpl getConsumedCount() { + return null; + } }; queue.addSubscription(contextNotInTx, subscription);
