Repository: activemq Updated Branches: refs/heads/master 76490a2c7 -> 83514ef79
AMQ-7001 - ensure cursor pending cached id list is pruned of futures that end in an exception, fix and test Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/83514ef7 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/83514ef7 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/83514ef7 Branch: refs/heads/master Commit: 83514ef799cb71c3ed1ee1f81553d87383f2cd42 Parents: 76490a2 Author: gtully <[email protected]> Authored: Tue Jul 3 21:18:36 2018 +0100 Committer: gtully <[email protected]> Committed: Tue Jul 3 21:18:36 2018 +0100 ---------------------------------------------------------------------- .../region/cursors/AbstractStoreCursor.java | 17 ++++- .../broker/region/cursors/StoreQueueCursor.java | 5 ++ .../store/kahadb/ErrorOnFutureSendTest.java | 65 +++++++++++++++++++- 3 files changed, 84 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/83514ef7/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 8b910bf..5140c72 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -20,6 +20,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.ListIterator; import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -350,8 +351,20 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i final Object futureOrLong = candidate.getFutureOrSequenceLong(); if (futureOrLong instanceof Future) { Future future = (Future) futureOrLong; - if (future.isCancelled()) { - it.remove(); + if (future.isDone()) { + if (future.isCancelled()) { + it.remove(); + } else { + // check for exception, we may be seeing old state + try { + future.get(0, TimeUnit.SECONDS); + // stale; if we get a result next prune will see Long + } catch (ExecutionException expected) { + it.remove(); + } catch (Exception unexpected) { + LOG.debug("{} unexpected exception verifying exception state of future", this, unexpected); + } + } } else { // we don't want to wait for work to complete break; http://git-wip-us.apache.org/repos/asf/activemq/blob/83514ef7/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index 3cb1cc2..a7b4c6e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -219,6 +219,11 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { this.nonPersistent = nonPersistent; } + /** + * @return the persistent Cursor + */ + public PendingMessageCursor getPersistent() { return this.persistent; } + @Override public void setMaxBatchSize(int maxBatchSize) { persistent.setMaxBatchSize(maxBatchSize); http://git-wip-us.apache.org/repos/asf/activemq/blob/83514ef7/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/ErrorOnFutureSendTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/ErrorOnFutureSendTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/ErrorOnFutureSendTest.java index 8f0a289..e270dda 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/ErrorOnFutureSendTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/ErrorOnFutureSendTest.java @@ -20,9 +20,14 @@ package org.apache.activemq.store.kahadb; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.cursors.PendingMessageCursor; +import org.apache.activemq.broker.region.cursors.StoreQueueCursor; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.TransactionIdTransformer; +import org.apache.activemq.util.Wait; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -33,10 +38,14 @@ import org.slf4j.LoggerFactory; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import java.io.File; +import java.util.concurrent.atomic.AtomicInteger; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -120,4 +129,58 @@ public class ErrorOnFutureSendTest { connection.close(); } -} \ No newline at end of file + + @Test(timeout = 30000) + public void testSuccessiveFailedSendsDoesNotConsumeMemInError() throws Exception { + + adapter.setTransactionIdTransformer(new TransactionIdTransformer() { + @Override + public TransactionId transform(TransactionId txid) { + throw new RuntimeException("Bla"); + } + }); + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + connectionFactory.setWatchTopicAdvisories(false); + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(destination); + Message message = session.createMessage(); + + final AtomicInteger received = new AtomicInteger(); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + received.incrementAndGet(); + } + }); + + final int numIterations = 10; + for (int i=0; i<numIterations; i++) { + try { + producer.send(message); + fail("Expect exception"); + } catch (JMSException expected) {} + } + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return received.get() == numIterations; + } + }); + consumer.close(); + connection.close(); + + RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); + Queue queue = (Queue) regionBroker.getQueueRegion().getDestinationMap().get(destination); + StoreQueueCursor storeQueueCursor = (StoreQueueCursor) queue.getMessages(); + PendingMessageCursor queueStorePrefetch = storeQueueCursor.getPersistent(); + LOG.info("QueueStorePrefetch {}", queueStorePrefetch); + String toString = queueStorePrefetch.toString(); + assertTrue("contains pendingCachedIds.size:1", toString.contains("pendingCachedIds.size:1")); + } +}
