ARTEMIS-828 Queue browsing can be out of sync while paging https://issues.apache.org/jira/browse/ARTEMIS-828
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bfb9bedb Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bfb9bedb Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bfb9bedb Branch: refs/heads/master Commit: bfb9bedb2d7a3d8ab5e336509915eb6ecafaefb3 Parents: e002125 Author: Clebert Suconic <[email protected]> Authored: Thu Oct 27 17:30:34 2016 -0400 Committer: Clebert Suconic <[email protected]> Committed: Fri Oct 28 16:54:58 2016 -0400 ---------------------------------------------------------------------- .../core/management/impl/QueueControlImpl.java | 8 +-- .../core/paging/cursor/PageSubscription.java | 5 +- .../impl/PageSubscriptionCounterImpl.java | 1 - .../cursor/impl/PageSubscriptionImpl.java | 21 ++++-- .../activemq/artemis/core/server/Queue.java | 2 +- .../artemis/core/server/impl/QueueImpl.java | 67 ++++++++++++++------ .../core/server/impl/ScaleDownHandler.java | 4 +- .../core/server/impl/ServerConsumerImpl.java | 2 +- .../impl/ScheduledDeliveryHandlerTest.java | 2 +- .../integration/paging/PagingSendTest.java | 4 +- .../unit/core/postoffice/impl/FakeQueue.java | 2 +- .../unit/core/server/impl/QueueImplTest.java | 2 +- 12 files changed, 81 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index cfa8aa5..85bad25 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -410,7 +410,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { Filter filter = FilterImpl.createFilter(filterStr); List<Map<String, Object>> messages = new ArrayList<>(); queue.flushExecutor(); - try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) { + try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) { while (iterator.hasNext()) { MessageReference ref = iterator.next(); if (filter == null || filter.match(ref.getMessage())) { @@ -446,7 +446,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { try { List<Map<String, Object>> messages = new ArrayList<>(); queue.flushExecutor(); - try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) { + try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) { // returns just the first, as it's the first only if (iterator.hasNext()) { MessageReference ref = iterator.next(); @@ -499,7 +499,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { if (filter == null) { return getMessageCount(); } else { - try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) { + try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) { int count = 0; while (iterator.hasNext()) { MessageReference ref = iterator.next(); @@ -895,7 +895,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { ArrayList<CompositeData> c = new ArrayList<>(); Filter filter = FilterImpl.createFilter(filterStr); queue.flushExecutor(); - try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) { + try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) { while (iterator.hasNext() && currentPageSize++ < pageSize) { MessageReference ref = iterator.next(); if (filter == null || filter.match(ref.getMessage())) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java index 89c6d44..6e569c1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java @@ -56,7 +56,10 @@ public interface PageSubscription { LinkedListIterator<PagedReference> iterator(); - // To be called when the cursor is closed for good. Most likely when the queue is deleted + LinkedListIterator<PagedReference> iterator(boolean jumpRemoves); + + + // To be called when the cursor is closed for good. Most likely when the queue is deleted void destroy() throws Exception; void scheduleCleanupCheck(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java index e01098d..01ad778 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java @@ -251,7 +251,6 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { recordID = -1; value.set(0); - added.set(0); incrementRecords.clear(); } } finally { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index c1c54a2..063722c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -351,6 +351,11 @@ final class PageSubscriptionImpl implements PageSubscription { return new CursorIterator(); } + @Override + public PageIterator iterator(boolean browsing) { + return new CursorIterator(browsing); + } + private PagedReference internalGetNext(final PagePosition pos) { PagePosition retPos = pos.nextMessage(); @@ -1100,6 +1105,8 @@ final class PageSubscriptionImpl implements PageSubscription { private volatile PagedReference lastRedelivery = null; + private final boolean browsing; + // We only store the position for redeliveries. They will be read from the SoftCache again during delivery. private final java.util.Queue<PagePosition> redeliveries = new LinkedList<>(); @@ -1109,7 +1116,13 @@ final class PageSubscriptionImpl implements PageSubscription { */ private volatile PagedReference cachedNext; + private CursorIterator(boolean browsing) { + this.browsing = browsing; + } + + private CursorIterator() { + this.browsing = false; } @Override @@ -1199,7 +1212,7 @@ final class PageSubscriptionImpl implements PageSubscription { PageCursorInfo info = getPageInfo(message.getPosition().getPageNr()); - if (info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) { + if (!browsing && info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) { continue; } @@ -1225,7 +1238,7 @@ final class PageSubscriptionImpl implements PageSubscription { // nothing // is being changed. That's why the false is passed as a parameter here - if (info != null && info.isRemoved(message.getPosition())) { + if (!browsing && info != null && info.isRemoved(message.getPosition())) { valid = false; } } @@ -1237,10 +1250,10 @@ final class PageSubscriptionImpl implements PageSubscription { if (valid) { match = match(message.getMessage()); - if (!match) { + if (!browsing && !match) { processACK(message.getPosition()); } - } else if (ignored) { + } else if (!browsing && ignored) { positionIgnored(message.getPosition()); } } while (!match); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 0dcef3d..52cd2f0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -195,7 +195,7 @@ public interface Queue extends Bindable { */ LinkedListIterator<MessageReference> iterator(); - LinkedListIterator<MessageReference> totalIterator(); + LinkedListIterator<MessageReference> browserIterator(); SimpleString getExpiryAddress(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index b70fe8d..56a33ef 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -867,8 +867,8 @@ public class QueueImpl implements Queue { } @Override - public TotalQueueIterator totalIterator() { - return new TotalQueueIterator(); + public QueueBrowserIterator browserIterator() { + return new QueueBrowserIterator(); } @Override @@ -2863,17 +2863,23 @@ public class QueueImpl implements Queue { //Readonly (no remove) iterator over the messages in the queue, in order of //paging store, intermediateMessageReferences and MessageReferences - private class TotalQueueIterator implements LinkedListIterator<MessageReference> { + private class QueueBrowserIterator implements LinkedListIterator<MessageReference> { - LinkedListIterator<PagedReference> pageIter = null; + LinkedListIterator<PagedReference> pagingIterator = null; LinkedListIterator<MessageReference> messagesIterator = null; + private LinkedListIterator<PagedReference> getPagingIterator() { + if (pagingIterator == null) { + pagingIterator = pageSubscription.iterator(true); + } + return pagingIterator; + } + Iterator lastIterator = null; - private TotalQueueIterator() { - if (pageSubscription != null) { - pageIter = pageSubscription.iterator(); - } + MessageReference cachedNext = null; + + private QueueBrowserIterator() { messagesIterator = new SynchronizedIterator(messageReferences.iterator()); } @@ -2883,9 +2889,9 @@ public class QueueImpl implements Queue { lastIterator = messagesIterator; return true; } - if (pageIter != null) { - if (pageIter.hasNext()) { - lastIterator = pageIter; + if (getPagingIterator() != null) { + if (getPagingIterator().hasNext()) { + lastIterator = getPagingIterator(); return true; } } @@ -2893,16 +2899,37 @@ public class QueueImpl implements Queue { return false; } + + @Override public MessageReference next() { - if (messagesIterator != null && messagesIterator.hasNext()) { - MessageReference msg = messagesIterator.next(); - return msg; + + if (cachedNext != null) { + try { + return cachedNext; + } finally { + cachedNext = null; + } + } - if (pageIter != null) { - if (pageIter.hasNext()) { - lastIterator = pageIter; - return pageIter.next(); + while (true) { + if (messagesIterator != null && messagesIterator.hasNext()) { + MessageReference msg = messagesIterator.next(); + if (msg.isPaged()) { + System.out.println("** Rejecting because it's paged " + msg.getMessage()); + continue; + } +// System.out.println("** Returning because it's not paged " + msg.getMessage()); + return msg; + } else { + break; + } + } + if (getPagingIterator() != null) { + if (getPagingIterator().hasNext()) { + lastIterator = getPagingIterator(); + MessageReference ref = getPagingIterator().next(); + return ref; } } @@ -2922,8 +2949,8 @@ public class QueueImpl implements Queue { @Override public void close() { - if (pageIter != null) { - pageIter.close(); + if (getPagingIterator() != null) { + getPagingIterator().close(); } if (messagesIterator != null) { messagesIterator.close(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java index b763ff2..dc62676 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java @@ -165,7 +165,7 @@ public class ScaleDownHandler { for (Queue loopQueue : queues) { logger.debug("Scaling down messages on address " + address + " / performing loop on queue " + loopQueue); - try (LinkedListIterator<MessageReference> messagesIterator = loopQueue.totalIterator()) { + try (LinkedListIterator<MessageReference> messagesIterator = loopQueue.browserIterator()) { while (messagesIterator.hasNext()) { MessageReference messageReference = messagesIterator.next(); @@ -249,7 +249,7 @@ public class ScaleDownHandler { for (Queue queue : queues) { // using auto-closeable - try (LinkedListIterator<MessageReference> messagesIterator = queue.totalIterator()) { + try (LinkedListIterator<MessageReference> messagesIterator = queue.browserIterator()) { // loop through every message of this queue while (messagesIterator.hasNext()) { MessageReference messageRef = messagesIterator.next(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 1318ff3..98a9c84 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -206,7 +206,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { this.creationTime = System.currentTimeMillis(); if (browseOnly) { - browserDeliverer = new BrowserDeliverer(messageQueue.totalIterator()); + browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator()); } else { messageQueue.addConsumer(this); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index d82f7d3..55a287a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1212,7 +1212,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public LinkedListIterator<MessageReference> totalIterator() { + public LinkedListIterator<MessageReference> browserIterator() { return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java index ca8a9a1..1f0d7e0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java @@ -308,7 +308,7 @@ public class PagingSendTest extends ActiveMQTestBase { * duplicates that may have happened before this point). */ public void checkBatchMessagesAreNotPagedTwice(Queue queue) throws Exception { - LinkedListIterator<MessageReference> pageIterator = queue.totalIterator(); + LinkedListIterator<MessageReference> pageIterator = queue.browserIterator(); Set<String> messageOrderSet = new HashSet<>(); @@ -344,7 +344,7 @@ public class PagingSendTest extends ActiveMQTestBase { * duplicates that may have happened before this point). */ protected int processCountThroughIterator(Queue queue) throws Exception { - LinkedListIterator<MessageReference> pageIterator = queue.totalIterator(); + LinkedListIterator<MessageReference> pageIterator = queue.browserIterator(); int count = 0; while (pageIterator.hasNext()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index bba5dc1..9a20d70 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -604,7 +604,7 @@ public class FakeQueue implements Queue { } @Override - public LinkedListIterator<MessageReference> totalIterator() { + public LinkedListIterator<MessageReference> browserIterator() { // TODO Auto-generated method stub return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java index b9bdba7..804429f 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java @@ -1274,7 +1274,7 @@ public class QueueImplTest extends ActiveMQTestBase { locator.close(); Queue queue = ((LocalQueueBinding) server.getPostOffice().getBinding(new SimpleString(MY_QUEUE))).getQueue(); - LinkedListIterator<MessageReference> totalIterator = queue.totalIterator(); + LinkedListIterator<MessageReference> totalIterator = queue.browserIterator(); try { int i = 0;
