https://issues.apache.org/jira/browse/AMQ-4485 - fix test regression with browse test - AMQ4595Test - reduce replay window when sync and asnyc cursor updates flip message order - concurrentStoreAndDispatch=true - https://issues.apache.org/jira/browse/AMQ-5266 - increse default audit depth to match async jobs for concurrent store
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/97c127d2 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/97c127d2 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/97c127d2 Branch: refs/heads/trunk Commit: 97c127d2d4086709a50a9da6f17773d6b1a5fc33 Parents: a56996d Author: gtully <[email protected]> Authored: Thu Oct 9 13:47:31 2014 +0100 Committer: gtully <[email protected]> Committed: Thu Oct 9 13:47:31 2014 +0100 ---------------------------------------------------------------------- .../activemq/broker/region/BaseDestination.java | 2 +- .../region/cursors/AbstractStoreCursor.java | 107 ++++++++++++++----- .../region/cursors/QueueStorePrefetch.java | 2 +- .../activemq/store/kahadb/KahaDBStore.java | 3 +- .../activemq/bugs/AMQ4485LowLimitTest.java | 2 - .../org/apache/activemq/bugs/AMQ4595Test.java | 13 +-- 6 files changed, 92 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 601f59c..5a41df3 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -57,7 +57,7 @@ public abstract class BaseDestination implements Destination { public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000; public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000; public static final int MAX_PRODUCERS_TO_AUDIT = 64; - public static final int MAX_AUDIT_DEPTH = 2048; + public static final int MAX_AUDIT_DEPTH = 10000; protected final ActiveMQDestination destination; protected final Broker broker; http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index bef6017..c08b293 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.cursors; import java.util.Iterator; import java.util.LinkedList; +import java.util.ListIterator; import java.util.concurrent.Future; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; @@ -41,7 +42,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i private boolean storeHasMessages = false; protected int size; private LinkedList<MessageId> pendingCachedIds = new LinkedList<>(); - MessageId lastCachedId = null; + private static int SYNC_ADD = 0; + private static int ASYNC_ADD = 1; + final MessageId[] lastCachedIds = new MessageId[2]; protected boolean hadSpace = false; protected AbstractStoreCursor(Destination destination) { @@ -203,12 +206,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } if (isCacheEnabled()) { if (recoverMessage(node.getMessage(),true)) { - if (node.getMessageId().getFutureOrSequenceLong() instanceof Future) { - pruneLastCached(); - pendingCachedIds.add(node.getMessageId()); - } else { - setLastCachedId(node.getMessageId()); - } + trackLastCached(node); } else { dealWithDuplicates(); return false; @@ -219,24 +217,78 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } if (disableCache && isCacheEnabled()) { + LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong()); setCacheEnabled(false); - // sync with store on disabling the cache - if (!pendingCachedIds.isEmpty() || lastCachedId != null) { - LOG.trace("{} - disabling cache. current Id: {} seq: {}, batchList size: {}", - new Object[]{this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong(), batchList.size()}); - pruneLastCached(); - if (lastCachedId != null) { - setBatch(lastCachedId); - lastCachedId = null; - pendingCachedIds.clear(); - } - } + syncWithStore(); } this.storeHasMessages = true; size++; return true; } + private void syncWithStore() throws Exception { + if (lastCachedIds[SYNC_ADD] == null) { + // only async adds, lets wait on the potential last add and reset from there + for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) { + MessageId lastStored = it.previous(); + Object futureOrLong = lastStored.getFutureOrSequenceLong(); + if (futureOrLong instanceof Future) { + Future future = (Future) futureOrLong; + if (future.isCancelled()) { + continue; + } else { + try { + future.get(); + setLastCachedId(ASYNC_ADD, lastStored); + } catch (Exception ignored) {} + } + } + } + if (lastCachedIds[ASYNC_ADD] != null) { + setBatch(lastCachedIds[ASYNC_ADD]); + } + } else { + // mix of async and sync - async can exceed sync only if next in sequence + for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) { + MessageId candidate = it.next(); + final Object futureOrLong = candidate.getFutureOrSequenceLong(); + if (futureOrLong instanceof Future) { + Future future = (Future) futureOrLong; + if (future.isCancelled()) { + it.remove(); + } else { + try { + future.get(); + long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong(); + if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), next) == 0) { + setLastCachedId(SYNC_ADD, candidate); + } else { + // out of sequence, revert to sync state + LOG.trace("{} cursor order out of sync at seq {}, audit must suppress potential replay of {} messages from the store", this, next, pendingCachedIds.size()); + break; + } + } catch (Exception ignored) {} + } + } + } + if (lastCachedIds[SYNC_ADD] != null) { + setBatch(lastCachedIds[SYNC_ADD]); + } + + } + // cleanup + lastCachedIds[SYNC_ADD] = lastCachedIds[ASYNC_ADD] = null; + pendingCachedIds.clear(); + } + + private void trackLastCached(MessageReference node) { + if (node.getMessageId().getFutureOrSequenceLong() instanceof Future) { + pruneLastCached(); + pendingCachedIds.add(node.getMessageId()); + } else { + setLastCachedId(SYNC_ADD, node.getMessageId()); + } + } private void pruneLastCached() { for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) { @@ -247,21 +299,22 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i if (future.isCancelled()) { it.remove(); } else { + // we don't want to wait for work to complete break; } } else { - // store complete - track via lastCachedId - setLastCachedId(candidate); + // complete + setLastCachedId(ASYNC_ADD, candidate); it.remove(); } } } - private void setLastCachedId(MessageId candidate) { - if (lastCachedId == null || lastCachedId.getFutureOrSequenceLong() == null) { // possibly null for topics - lastCachedId = candidate; - } else if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), ((Long) lastCachedId.getFutureOrSequenceLong())) > 0) { - lastCachedId = candidate; + private void setLastCachedId(final int index, MessageId candidate) { + if (lastCachedIds[index] == null || lastCachedIds[index].getFutureOrSequenceLong() == null) { // possibly null for topics + lastCachedIds[index] = candidate; + } else if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), ((Long) lastCachedIds[index].getFutureOrSequenceLong())) > 0) { + lastCachedIds[index] = candidate; } } @@ -351,7 +404,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i public String toString() { return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled() - + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size() + ",lastCachedId:" + lastCachedId + ",lastCachedId-seq:" + (lastCachedId != null ? lastCachedId.getFutureOrSequenceLong() : "null"); + + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size() + + ",lastSyncCachedId:" + lastCachedIds[SYNC_ADD] + ",lastSyncCachedId-seq:" + (lastCachedIds[SYNC_ADD] != null ? lastCachedIds[SYNC_ADD].getFutureOrSequenceLong() : "null") + + ",lastAsyncCachedId:" + lastCachedIds[ASYNC_ADD] + ",lastAsyncCachedId-seq:" + (lastCachedIds[ASYNC_ADD] != null ? lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong() : "null"); } protected abstract void doFillBatch() throws Exception; http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java index 1f42d57..94dc817 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java @@ -94,7 +94,7 @@ class QueueStorePrefetch extends AbstractStoreCursor { @Override protected void setBatch(MessageId messageId) throws Exception { - LOG.trace("{} setBatch {} loc: {}", this, messageId, messageId.getEntryLocator()); + LOG.trace("{} setBatch {} seq: {}, loc: {}", this, messageId, messageId.getFutureOrSequenceLong(), messageId.getEntryLocator()); store.setBatch(messageId); batchResetNeeded = false; } http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index abdf4bf..eb5d1c4 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; @@ -86,7 +87,7 @@ import org.slf4j.LoggerFactory; public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); - private static final int MAX_ASYNC_JOBS = 10000; + private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH; public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC"; public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty( http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java index eb46f8f..38c85da 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java @@ -101,7 +101,6 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport { addNetworkConnector(broker); } broker.setSchedulePeriodForDestinationPurge(0); - //broker.getSystemUsage().setSendFailIfNoSpace(true); broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024l); @@ -406,7 +405,6 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport { int id = numMessages - val - 1; ActiveMQQueue compositeQ = new ActiveMQQueue("IN"); - //LOG.info("Send to: " + ((ActiveMQConnection) queueConnection).getBrokerName() + ", " + val + ", dest:" + compositeQ); Message textMessage = queueSession.createTextMessage(((ActiveMQConnection) queueConnection).getBrokerName() + "->" + id + " payload:" + payload); textMessage.setIntProperty("NUM", id); producer.send(compositeQ, textMessage); http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java index 0baf5c3..507e52e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java @@ -16,12 +16,9 @@ */ package org.apache.activemq.bugs; -import static org.junit.Assert.assertEquals; - import java.net.URI; import java.util.Date; import java.util.Enumeration; - import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; @@ -29,13 +26,10 @@ import javax.jms.MessageProducer; import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; - import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; import org.junit.After; import org.junit.Before; @@ -43,6 +37,9 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; + public class AMQ4595Test { private static final Logger LOG = LoggerFactory.getLogger(AMQ4595Test.class); @@ -112,6 +109,8 @@ public class AMQ4595Test { } producerConnection.close(); + LOG.info("Mem usage after producer done: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + "%"); + // Browse the queue. Connection connection = factory.createConnection(); connection.start(); @@ -131,6 +130,8 @@ public class AMQ4595Test { session.close(); connection.close(); + LOG.info("Mem usage after browser closed: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + "%"); + // The number of messages browsed should be equal to the number of messages sent. assertEquals(messageToSend, browsed);
