Repository: activemq Updated Branches: refs/heads/trunk 74d2c2425 -> 8b8f63008
isolate cursor storeHasMessage logic into durable topic sub cursor b/c only durable sub cursors have selectors that won't match, otherwise we should always read a page if the store has messages Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8b8f6300 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8b8f6300 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8b8f6300 Branch: refs/heads/trunk Commit: 8b8f6300801f318ce0ce365c4ae600fce3548244 Parents: 74d2c24 Author: gtully <gary.tu...@gmail.com> Authored: Tue Oct 21 23:21:45 2014 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Tue Oct 21 23:23:15 2014 +0100 ---------------------------------------------------------------------- .../region/cursors/AbstractStoreCursor.java | 17 ++++------------- .../region/cursors/TopicStorePrefetch.java | 20 +++++++++++++++++--- 2 files changed, 21 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/8b8f6300/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index f2df96e..07d4351 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -42,7 +42,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i protected final PendingList batchList; private Iterator<MessageReference> iterator = null; protected boolean batchResetNeeded = false; - private boolean storeHasMessages = false; protected int size; private LinkedList<MessageId> pendingCachedIds = new LinkedList<>(); private static int SYNC_ADD = 0; @@ -66,13 +65,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i super.start(); resetBatch(); resetSize(); - setCacheEnabled(!this.storeHasMessages&&useCache); + setCacheEnabled(size==0&&useCache); } } protected void resetSize() { this.size = getStoreSize(); - this.storeHasMessages=this.size > 0; } @Override @@ -93,7 +91,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { boolean recovered = false; - storeHasMessages = true; if (recordUniqueId(message.getMessageId())) { if (!cached) { message.setRegionDestination(regionDestination); @@ -202,7 +199,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i return result; } - public final synchronized boolean addMessageLast(MessageReference node) throws Exception { + public synchronized boolean addMessageLast(MessageReference node) throws Exception { boolean disableCache = false; if (hasSpace()) { if (!isCacheEnabled() && size==0 && isStarted() && useCache) { @@ -230,7 +227,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i syncWithStore(node.getMessage()); setCacheEnabled(false); } - this.storeHasMessages = true; size++; return true; } @@ -380,18 +376,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i resetBatch(); this.batchResetNeeded = false; } - if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) { - // avoid repeated trips to the store if there is nothing of interest - this.storeHasMessages = false; + if (this.batchList.isEmpty() && this.size >0) { try { doFillBatch(); } catch (Exception e) { LOG.error("{} - Failed to fill batch", this, e); throw new RuntimeException(e); } - if (!this.storeHasMessages && (!this.batchList.isEmpty() || !hadSpace)) { - this.storeHasMessages = true; - } } } @@ -417,7 +408,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i @Override public String toString() { return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded - + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled() + + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled() + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size() + ",lastSyncCachedId:" + lastCachedIds[SYNC_ADD] + ",lastSyncCachedId-seq:" + (lastCachedIds[SYNC_ADD] != null ? lastCachedIds[SYNC_ADD].getFutureOrSequenceLong() : "null") + ",lastAsyncCachedId:" + lastCachedIds[ASYNC_ADD] + ",lastAsyncCachedId-seq:" + (lastCachedIds[ASYNC_ADD] != null ? lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong() : "null"); http://git-wip-us.apache.org/repos/asf/activemq/blob/8b8f6300/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index 9e02e4e..811531e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -40,6 +40,8 @@ class TopicStorePrefetch extends AbstractStoreCursor { private final String subscriberName; private final Subscription subscription; private byte lastRecoveredPriority = 9; + private boolean storeHasMessages = false; + /** * @param topic * @param clientId @@ -54,6 +56,7 @@ class TopicStorePrefetch extends AbstractStoreCursor { this.maxProducersToAudit=32; this.maxAuditDepth=10000; resetSize(); + this.storeHasMessages=this.size > 0; } public boolean recoverMessageReference(MessageId messageReference) throws Exception { @@ -65,8 +68,13 @@ class TopicStorePrefetch extends AbstractStoreCursor { batchList.addMessageFirst(node); size++; } - - + + @Override + public final synchronized boolean addMessageLast(MessageReference node) throws Exception { + this.storeHasMessages = super.addMessageLast(node); + return this.storeHasMessages; + } + @Override public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { LOG.trace("{} recover: {}, priority: {}", this, message.getMessageId(), message.getPriority()); @@ -78,6 +86,7 @@ class TopicStorePrefetch extends AbstractStoreCursor { if (recovered && !cached) { lastRecoveredPriority = message.getPriority(); } + storeHasMessages = true; } return recovered; } @@ -110,8 +119,13 @@ class TopicStorePrefetch extends AbstractStoreCursor { @Override protected void doFillBatch() throws Exception { + // avoid repeated trips to the store if there is nothing of interest + this.storeHasMessages = false; this.store.recoverNextMessages(clientId, subscriberName, maxBatchSize, this); + if (!this.storeHasMessages && (!this.batchList.isEmpty() || !hadSpace)) { + this.storeHasMessages = true; + } } public byte getLastRecoveredPriority() { @@ -129,6 +143,6 @@ class TopicStorePrefetch extends AbstractStoreCursor { @Override public String toString() { - return "TopicStorePrefetch(" + clientId + "," + subscriberName + ") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString(); + return "TopicStorePrefetch(" + clientId + "," + subscriberName + ",storeHasMessages=" + this.storeHasMessages +") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString(); } }