Repository: activemq Updated Branches: refs/heads/master acb8602ad -> ee54f0930
https://issues.apache.org/jira/browse/AMQ-5831 - revisit topic subscriptions Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ee54f093 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ee54f093 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ee54f093 Branch: refs/heads/master Commit: ee54f09303f52d2753ce9ac8e64008e3e60c2eab Parents: acb8602 Author: Dejan Bosanac <[email protected]> Authored: Wed Aug 26 12:28:19 2015 +0200 Committer: Dejan Bosanac <[email protected]> Committed: Wed Aug 26 12:28:39 2015 +0200 ---------------------------------------------------------------------- .../broker/region/DurableTopicSubscription.java | 1 + .../broker/region/TopicSubscription.java | 65 +++++------- .../apache/activemq/broker/jmx/MBeanTest.java | 100 +++++++++++++++++-- .../DurableSubscriptionOffline2Test.java | 4 +- 4 files changed, 119 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/ee54f093/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 0107c58..cf60fdf 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -322,6 +322,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us regionDestination.acknowledge(context, this, ack, node); redeliveredMessages.remove(node.getMessageId()); node.decrementReferenceCount(); + ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment(); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/ee54f093/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index b20d080..d3e683d 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -279,37 +279,16 @@ public class TopicSubscription extends AbstractSubscription { if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) { if (context.isInTransaction()) { context.getTransaction().addSynchronization(new Synchronization() { - @Override public void afterCommit() throws Exception { - synchronized (TopicSubscription.this) { - if (singleDestination && destination != null) { - destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); - } - } - getSubscriptionStatistics().getDequeues().add(ack.getMessageCount()); - updateInflightMessageSizeOnAck(ack); + updateStatsOnAck(ack); dispatchMatched(); } }); } else { - if (singleDestination && destination != null) { - destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); - destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); - if (info.isNetworkSubscription()) { - destination.getDestinationStatistics().getForwards().add(ack.getMessageCount()); - } - } - getSubscriptionStatistics().getDequeues().add(ack.getMessageCount()); - updateInflightMessageSizeOnAck(ack); - } - while (true) { - int currentExtension = prefetchExtension.get(); - int newExtension = Math.max(0, currentExtension - ack.getMessageCount()); - if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { - break; - } + updateStatsOnAck(ack); } + updatePrefetch(ack); dispatchMatched(); return; } else if (ack.isDeliveredAck()) { @@ -318,19 +297,8 @@ public class TopicSubscription extends AbstractSubscription { dispatchMatched(); return; } else if (ack.isExpiredAck()) { - if (singleDestination && destination != null) { - destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); - destination.getDestinationStatistics().getExpired().add(ack.getMessageCount()); - destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); - } - getSubscriptionStatistics().getDequeues().add(ack.getMessageCount()); - while (true) { - int currentExtension = prefetchExtension.get(); - int newExtension = Math.max(0, currentExtension - ack.getMessageCount()); - if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { - break; - } - } + updateStatsOnAck(ack); + updatePrefetch(ack); dispatchMatched(); return; } else if (ack.isRedeliveredAck()) { @@ -393,10 +361,10 @@ public class TopicSubscription extends AbstractSubscription { } /** - * Update the inflight statistics on message ack. + * Update the statistics on message ack. * @param ack */ - private void updateInflightMessageSizeOnAck(final MessageAck ack) { + private void updateStatsOnAck(final MessageAck ack) { synchronized(dispatchLock) { boolean inAckRange = false; List<MessageReference> removeList = new ArrayList<MessageReference>(); @@ -417,6 +385,25 @@ public class TopicSubscription extends AbstractSubscription { for (final MessageReference node : removeList) { dispatched.remove(node); getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); + getSubscriptionStatistics().getDequeues().increment(); + ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment(); + ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); + if (info.isNetworkSubscription()) { + ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount()); + } + if (ack.isExpiredAck()) { + destination.getDestinationStatistics().getExpired().add(ack.getMessageCount()); + } + } + } + } + + private void updatePrefetch(MessageAck ack) { + while (true) { + int currentExtension = prefetchExtension.get(); + int newExtension = Math.max(0, currentExtension - ack.getMessageCount()); + if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { + break; } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/ee54f093/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index acf706d..b9b8e2f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -20,21 +20,13 @@ import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.net.URL; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; +import javax.jms.*; import javax.management.MBeanServer; import javax.management.MBeanServerInvocationHandler; import javax.management.MalformedObjectNameException; @@ -59,6 +51,7 @@ import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.memory.list.MessageList; import org.apache.activemq.util.JMXSupport; import org.apache.activemq.util.URISupport; import org.apache.activemq.util.Wait; @@ -1621,4 +1614,91 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { assertNotNull("Message: " + i, consumer.receive(5000)); } } + + public void testTopicView() throws Exception { + connection = connectionFactory.createConnection(); + connection.setClientID("test"); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + connection.start(); + + Topic singleTopic = session.createTopic("test.topic"); + Topic wildcardTopic = session.createTopic("test.>"); + + TopicSubscriber durable1 = session.createDurableSubscriber(singleTopic, "single"); + TopicSubscriber durable2 = session.createDurableSubscriber(wildcardTopic, "wildcard"); + + MessageConsumer consumer1 = session.createConsumer(singleTopic); + MessageConsumer consumer2 = session.createConsumer(wildcardTopic); + + final ArrayList<Message> messages = new ArrayList<>(); + + MessageListener listener = new MessageListener() { + @Override + public void onMessage(Message message) { + messages.add(message); + } + }; + + durable1.setMessageListener(listener); + durable2.setMessageListener(listener); + consumer1.setMessageListener(listener); + consumer2.setMessageListener(listener); + + MessageProducer producer = session.createProducer(singleTopic); + producer.send(session.createTextMessage("test")); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return messages.size() == 4; + } + }); + + ObjectName topicObjName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test.topic"); + final TopicViewMBean topicView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName, TopicViewMBean.class, true); + + assertEquals(1, topicView.getEnqueueCount()); + assertEquals(4, topicView.getDispatchCount()); + assertEquals(4, topicView.getInFlightCount()); + assertEquals(0, topicView.getDequeueCount()); + + ArrayList<SubscriptionViewMBean> subscriberViews = new ArrayList(); + for (ObjectName name : topicView.getSubscriptions()) { + subscriberViews.add(MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true)); + } + + assertEquals(4, subscriberViews.size()); + + for (SubscriptionViewMBean subscriberView : subscriberViews) { + assertEquals(1, subscriberView.getEnqueueCounter()); + assertEquals(1, subscriberView.getDispatchedCounter()); + assertEquals(0, subscriberView.getDequeueCounter()); + } + + for (Message message : messages) { + try { + message.acknowledge(); + } catch (JMSException ignore) {} + } + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return topicView.getDequeueCount() == 4; + } + }); + + assertEquals(1, topicView.getEnqueueCount()); + assertEquals(4, topicView.getDispatchCount()); + assertEquals(0, topicView.getInFlightCount()); + assertEquals(4, topicView.getDequeueCount()); + + for (SubscriptionViewMBean subscriberView : subscriberViews) { + assertEquals(1, subscriberView.getEnqueueCounter()); + assertEquals(1, subscriberView.getDispatchedCounter()); + assertEquals(1, subscriberView.getDequeueCounter()); + } + + + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/ee54f093/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java index 960d9ea..f288340 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java @@ -119,7 +119,7 @@ public class DurableSubscriptionOffline2Test extends DurableSubscriptionOfflineT ObjectName destinationName = broker.getAdminView().getTopics()[0]; TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true); assertEquals("correct enqueue", 10, topicView.getEnqueueCount()); - assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount()); + assertEquals("topic view dequeue not updated", 5, topicView.getDequeueCount()); assertEquals("inflight", 5, topicView.getInFlightCount()); session.close(); @@ -138,7 +138,7 @@ public class DurableSubscriptionOffline2Test extends DurableSubscriptionOfflineT // destination view assertEquals("correct enqueue", 10, topicView.getEnqueueCount()); - assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount()); + assertEquals("topic view dequeue not updated", 5, topicView.getDequeueCount()); assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount()); // consume the rest
