Repository: activemq Updated Branches: refs/heads/trunk 76e29bdf9 -> 4705f95be
https://issues.apache.org/jira/browse/AMQ-5266 https://issues.apache.org/jira/browse/AMQ-4485 - tidy up exception trace and async check, fix leveldb async test regressions Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4705f95b Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4705f95b Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4705f95b Branch: refs/heads/trunk Commit: 4705f95becc983101d3534d573a45b157b400191 Parents: 76e29bd Author: gtully <[email protected]> Authored: Fri Oct 17 11:24:48 2014 +0100 Committer: gtully <[email protected]> Committed: Fri Oct 17 11:25:47 2014 +0100 ---------------------------------------------------------------------- .../broker/region/cursors/AbstractStoreCursor.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/4705f95b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index c4bf985..9998152 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker.region.cursors; import java.util.Iterator; import java.util.LinkedList; import java.util.ListIterator; +import java.util.concurrent.CancellationException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -249,9 +250,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i try { future.get(5, TimeUnit.SECONDS); setLastCachedId(ASYNC_ADD, lastPending); + } catch (CancellationException ok) { + continue; } catch (TimeoutException potentialDeadlock) { - LOG.warn("{} timed out waiting for async add", this, potentialDeadlock); - } catch (Exception cancelledOrTimeOutOrErrorWorstCaseWeReplay) {cancelledOrTimeOutOrErrorWorstCaseWeReplay.printStackTrace();} + LOG.debug("{} timed out waiting for async add", this, potentialDeadlock); + } catch (Exception worstCaseWeReplay) { + LOG.debug("{} exception waiting for async add", this, worstCaseWeReplay); + } } else { setLastCachedId(ASYNC_ADD, lastPending); } @@ -259,7 +264,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } if (lastCachedIds[ASYNC_ADD] != null) { // ensure we don't skip current possibly sync add b/c we waited on the future - if (currentAdd.isRecievedByDFBridge() || Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) > 0) { + if (isAsync(currentAdd) || Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) > 0) { setBatch(lastCachedIds[ASYNC_ADD]); } } @@ -272,7 +277,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } private void trackLastCached(MessageReference node) { - if (node.getMessageId().getFutureOrSequenceLong() instanceof Future || node.getMessage().isRecievedByDFBridge()) { + if (isAsync(node.getMessage())) { pruneLastCached(); pendingCachedIds.add(node.getMessageId()); } else { @@ -280,6 +285,10 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } } + private static final boolean isAsync(Message message) { + return message.isRecievedByDFBridge() || message.getMessageId().getFutureOrSequenceLong() instanceof Future; + } + private void pruneLastCached() { for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) { MessageId candidate = it.next();
