Repository: activemq Updated Branches: refs/heads/master d9c74d731 -> 511b9b642
[AMQ-6562] - suppress warn of durable sub duplicate from the store on cache exhaustion - expected in the absense of ordered sequenceid and setBatch. Fix leak of duplicates pending processing on batch fill for the durable sub case and remove eager page in for prefetch=0 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/511b9b64 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/511b9b64 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/511b9b64 Branch: refs/heads/master Commit: 511b9b642a198fe98409e1f1aac6176b9420d951 Parents: d9c74d7 Author: gtully <gary.tu...@gmail.com> Authored: Fri Jan 13 11:04:48 2017 +0000 Committer: gtully <gary.tu...@gmail.com> Committed: Fri Jan 13 11:06:05 2017 +0000 ---------------------------------------------------------------------- .../broker/region/PrefetchSubscription.java | 2 +- .../region/cursors/AbstractStoreCursor.java | 9 +- .../region/cursors/TopicStorePrefetch.java | 9 + .../activemq/usecases/DurableSubCacheTest.java | 197 +++++++++++++++++++ 4 files changed, 214 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/511b9b64/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 0a277fb..db133c1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -684,7 +684,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { setPendingBatchSize(pending, numberToDispatch); int count = 0; pending.reset(); - while (pending.hasNext() && !isFull() && count < numberToDispatch) { + while (count < numberToDispatch && !isFull() && pending.hasNext()) { MessageReference node = pending.next(); if (node == null) { break; http://git-wip-us.apache.org/repos/asf/activemq/blob/511b9b64/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 4d7ffea..0295b33 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -110,8 +110,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i recovered = true; } else if (!cached) { // a duplicate from the store (!cached) - needs to be removed/acked - otherwise it will get re dispatched on restart - if (message.isRecievedByDFBridge()) { - // expected for messages pending acks with kahadb.concurrentStoreAndDispatchQueues=true + if (duplicateFromStoreExcepted(message)) { if (LOG.isTraceEnabled()) { LOG.trace("{} store replayed pending message due to concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); } @@ -128,6 +127,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i return recovered; } + protected boolean duplicateFromStoreExcepted(Message message) { + // expected for messages pending acks with kahadb.concurrentStoreAndDispatchQueues=true for + // which this existing unused flag has been repurposed + return message.isRecievedByDFBridge(); + } + public static boolean gotToTheStore(Message message) throws Exception { if (message.isRecievedByDFBridge()) { // concurrent store and dispatch - wait to see if the message gets to the store to see http://git-wip-us.apache.org/repos/asf/activemq/blob/511b9b64/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index bfc745e..71bb4eb 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -95,6 +95,14 @@ class TopicStorePrefetch extends AbstractStoreCursor { } @Override + protected boolean duplicateFromStoreExcepted(Message message) { + // setBatch is not implemented - sequence order not reliable with concurrent transactions + // on cache exhaustion - first pageIn starts from last ack location which may replay what + // cursor has dispatched + return true; + } + + @Override protected synchronized int getStoreSize() { try { return store.getMessageCount(clientId, subscriberName); @@ -137,6 +145,7 @@ class TopicStorePrefetch extends AbstractStoreCursor { this.storeHasMessages = false; this.store.recoverNextMessages(clientId, subscriberName, maxBatchSize, this); + dealWithDuplicates(); if (!this.storeHasMessages && (!this.batchList.isEmpty() || !hadSpace)) { this.storeHasMessages = true; } http://git-wip-us.apache.org/repos/asf/activemq/blob/511b9b64/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubCacheTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubCacheTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubCacheTest.java new file mode 100644 index 0000000..17d1931 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubCacheTest.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.cursors.AbstractStoreCursor; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Appender; +import org.apache.log4j.Level; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.TopicSubscriber; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +public class DurableSubCacheTest { + private static final Logger LOG = LoggerFactory.getLogger(DurableSubCacheTest.class); + + + private final ActiveMQTopic topic = new ActiveMQTopic("T1"); + private BrokerService broker; + + @Before + public void setUp() throws Exception { + + broker = createAndStartBroker(); + broker.waitUntilStarted(); + } + + + private BrokerService createAndStartBroker() + throws Exception { + BrokerService broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setUseJmx(false); + broker.setAdvisorySupport(false); + broker.getSystemUsage().getMemoryUsage().setLimit(100 * 1024); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setCursorMemoryHighWaterMark(20); + policyMap.put(topic, policy); + broker.setDestinationPolicy(policyMap); + + broker.start(); + + return broker; + } + + @After + public void tearDown() throws Exception { + broker.stop(); + } + + @Test + public void testCacheExhaustion() throws Exception { + doTestCacheExhaustion(1000); + } + + @Test + public void testCacheExhaustionPrefetch0() throws Exception { + doTestCacheExhaustion(0); + } + + public void doTestCacheExhaustion(int prefetch) throws Exception { + + createDurableSub(topic, "my_sub_1"); + + publishMesssages(topic, 20); + + org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger(AbstractStoreCursor.class.getCanonicalName()); + final AtomicBoolean failed = new AtomicBoolean(false); + + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getLevel() == Level.WARN) { + LOG.info("Got warn event:" + event.getRenderedMessage()); + failed.set(true); + } + } + }; + log4jLogger.addAppender(appender); + + try { + consumeDurableSub(topic, "my_sub_1", 20, prefetch); + } finally { + log4jLogger.removeAppender(appender); + } + + assertFalse("no warning from the cursor", failed.get()); + } + + private void publishMesssages(ActiveMQTopic topic, int messageCount) throws Exception { + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + connectionFactory.setWatchTopicAdvisories(false); + Connection con = connectionFactory.createConnection(); + con.start(); + + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(topic); + + try { + String textMessage = new String(new byte[1024]); + TextMessage msg = session.createTextMessage(textMessage); + + for (int i = 0; i < messageCount; i++) { + producer.send(msg); + } + } finally { + con.close(); + } + + } + + + private void createDurableSub(ActiveMQTopic topic, String subID) throws Exception { + + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + connectionFactory.setWatchTopicAdvisories(false); + Connection con = connectionFactory.createConnection(); + con.setClientID("CONNECTION-" + subID); + con.start(); + + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + + session.createDurableSubscriber(topic, subID, null, true); + session.close(); + con.close(); + } + + private void consumeDurableSub(ActiveMQTopic topic, String subID, int messageCount) throws Exception { + consumeDurableSub(topic, subID, messageCount, 1000); + } + + private void consumeDurableSub(ActiveMQTopic topic, String subID, int messageCount, int prefetch) throws Exception { + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + ActiveMQConnection con = (ActiveMQConnection) connectionFactory.createConnection(); + con.setClientID("CONNECTION-" + subID); + con.getPrefetchPolicy().setAll(prefetch); + con.start(); + + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + + TopicSubscriber topicSubscriber + = session.createDurableSubscriber(topic, subID, null, true); + + try { + + for (int i = 0; i < messageCount; i++) { + javax.jms.Message message = topicSubscriber.receive(4000l); + if (message == null) { + fail("should have received a message"); + } + } + + } finally { + con.close(); + } + } + + +} \ No newline at end of file