Repository: activemq Updated Branches: refs/heads/master b1d8e66ea -> b4e35fe8a
AMQ-4181 - revert mod to testQueueBrowserWith2ConsumersInterleaved which cause intermittent ci failure - browse is a snapshot at time of creation. tidy up some gaps in pagein logic sync Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b4e35fe8 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b4e35fe8 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b4e35fe8 Branch: refs/heads/master Commit: b4e35fe8a355c0fd5fe8935ab583e2886121946b Parents: b1d8e66 Author: gtully <[email protected]> Authored: Mon May 23 13:29:25 2016 +0100 Committer: gtully <[email protected]> Committed: Mon May 23 13:29:39 2016 +0100 ---------------------------------------------------------------------- .../apache/activemq/broker/region/Queue.java | 37 ++++++++------------ .../org/apache/activemq/broker/BrokerTest.java | 9 ++++- 2 files changed, 23 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/b4e35fe8/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index f025998..318f558 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -410,14 +410,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index browser.incrementQueueRef(); } - void done() { - try { - browser.decrementQueueRef(); - } catch (Exception e) { - LOG.warn("decrement ref on browser: " + browser, e); - } - } - public QueueBrowserSubscription getBrowser() { return browser; } @@ -1602,12 +1594,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index pagedInPendingDispatchLock.readLock().unlock(); } - // Perhaps we should page always into the pagedInPendingDispatch - // list if - // !messages.isEmpty(), and then if - // !pagedInPendingDispatch.isEmpty() - // then we do a dispatch. - boolean hasBrowsers = browserDispatches.size() > 0; + boolean hasBrowsers = !browserDispatches.isEmpty(); if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) { try { @@ -1618,12 +1605,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } if (hasBrowsers) { - PendingList alreadyDispatchedMessages = isPrioritizedMessages() ? + PendingList messagesInMemory = isPrioritizedMessages() ? new PrioritizedPendingList() : new OrderedPendingList(); pagedInMessagesLock.readLock().lock(); - try{ - alreadyDispatchedMessages.addAll(pagedInMessages); - }finally { + try { + messagesInMemory.addAll(pagedInMessages); + } finally { pagedInMessagesLock.readLock().unlock(); } @@ -1636,9 +1623,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index QueueBrowserSubscription browser = browserDispatch.getBrowser(); - LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, alreadyDispatchedMessages.size()); + LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, messagesInMemory.size()); boolean added = false; - for (MessageReference node : alreadyDispatchedMessages) { + for (MessageReference node : messagesInMemory) { if (!((QueueMessageReference)node).isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) { msgContext.setMessageReference(node); if (browser.matches(node, msgContext)) { @@ -1902,7 +1889,13 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index List<QueueMessageReference> result = null; PendingList resultList = null; - int toPageIn = Math.min(maxPageSize, messages.size()); + int toPageIn = maxPageSize; + messagesLock.readLock().lock(); + try { + toPageIn = Math.min(toPageIn, messages.size()); + } finally { + messagesLock.readLock().unlock(); + } int pagedInPendingSize = 0; pagedInPendingDispatchLock.readLock().lock(); try { @@ -1913,7 +1906,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index if (isLazyDispatch() && !force) { // Only page in the minimum number of messages which can be // dispatched immediately. - toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn); + toPageIn = Math.min(toPageIn, getConsumerMessageCountBeforeFull()); } if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/activemq/blob/b4e35fe8/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java index 769cdbf..200d215 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java @@ -265,7 +265,9 @@ public class BrokerTest extends BrokerTestSupport { messages.add(m1); } - for (int i = 0; i < 4; i++) { + // a browse is a snapshot - only guarantee to see messages produced before + // the browser + for (int i = 0; i < 1; i++) { Message m1 = messages.get(i); Message m2 = receiveMessage(connection2); assertNotNull("m2 is null for index: " + i, m2); @@ -275,6 +277,11 @@ public class BrokerTest extends BrokerTestSupport { assertNoMessagesLeft(connection1); assertNoMessagesLeft(connection2); + + connection1.request(closeConnectionInfo(connectionInfo1)); + connection1.stop(); + connection2.request(closeConnectionInfo(connectionInfo2)); + connection2.stop(); }
