Repository: activemq Updated Branches: refs/heads/master b29eb384b -> 2562cf21a
https://issues.apache.org/jira/browse/AMQ-5718 - don't add messages to subscriber while it's discarding Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2562cf21 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2562cf21 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2562cf21 Branch: refs/heads/master Commit: 2562cf21a2fdfd3b0301160e28b7109d85f38cfe Parents: b29eb38 Author: Dejan Bosanac <[email protected]> Authored: Mon Apr 13 11:16:35 2015 +0200 Committer: Dejan Bosanac <[email protected]> Committed: Mon Apr 13 11:16:55 2015 +0200 ---------------------------------------------------------------------- .../broker/region/TopicSubscription.java | 33 +++++++++++++------- .../apache/activemq/advisory/AdvisoryTests.java | 28 +++++++++++++++-- 2 files changed, 47 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/2562cf21/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index e81be74..fe3d911 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -73,6 +73,7 @@ public class TopicSubscription extends AbstractSubscription { protected boolean enableAudit = false; protected ActiveMQMessageAudit audit; protected boolean active = false; + protected boolean discarding = false; public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception { super(broker, context, info); @@ -107,6 +108,11 @@ public class TopicSubscription extends AbstractSubscription { node = new IndirectMessageReference(node.getMessage()); enqueueCounter.incrementAndGet(); synchronized (matchedListMutex) { + // if this subscriber is already discarding a message, we don't want to add + // any more messages to it as those messages can only be advisories generated in the process, + // which can trigger the recursive call loop + if (discarding) return; + if (!isFull() && matched.isEmpty()) { // if maximumPendingMessages is set we will only discard messages which // have not been dispatched (i.e. we allow the prefetch buffer to be filled) @@ -639,18 +645,23 @@ public class TopicSubscription extends AbstractSubscription { } private void discard(MessageReference message) { - message.decrementReferenceCount(); - matched.remove(message); - discarded++; - if(destination != null) { - destination.getDestinationStatistics().getDequeues().increment(); - } - LOG.debug("{}, discarding message {}", this, message); - Destination dest = (Destination) message.getRegionDestination(); - if (dest != null) { - dest.messageDiscarded(getContext(), this, message); + discarding = true; + try { + message.decrementReferenceCount(); + matched.remove(message); + discarded++; + if (destination != null) { + destination.getDestinationStatistics().getDequeues().increment(); + } + LOG.debug("{}, discarding message {}", this, message); + Destination dest = (Destination) message.getRegionDestination(); + if (dest != null) { + dest.messageDiscarded(getContext(), this, message); + } + broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId())); + } finally { + discarding = false; } - broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId())); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/2562cf21/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java index 4bb9053..5e5eb7f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java @@ -33,11 +33,11 @@ import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.*; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; /** * @@ -161,6 +161,28 @@ public class AdvisoryTests extends TestCase { assertNotNull(msg); } + public void testMessageDLQd() throws Exception { + ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy(); + policy.setTopicPrefetch(2); + ((ActiveMQConnection)connection).setPrefetchPolicy(policy); + Session s = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Topic topic = s.createTopic(getClass().getName()); + + Topic advisoryTopic = s.createTopic(">"); + for (int i = 0; i < 100; i++) { + MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); + } + + + MessageProducer producer = s.createProducer(topic); + int count = 10; + for (int i = 0; i < count; i++) { + BytesMessage m = s.createBytesMessage(); + producer.send(m); + } + // we should get here without StackOverflow + } + public void xtestMessageDiscardedAdvisory() throws Exception { Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = s.createTopic(getClass().getName());
