Repository: activemq-6 Updated Branches: refs/heads/master 0eb6ebda2 -> 1491f4a12
ACTIVEMQ6-54 Fixing tests broken after Paging fix https://issues.apache.org/jira/browse/ACTIVEMQ6-54 Changing the order of depaging introduced an extra check that needs to be checked now. This will probably take care of the issue by checking if the page is complete before depage. Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/09490cdb Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/09490cdb Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/09490cdb Branch: refs/heads/master Commit: 09490cdba3278f53980a906f35893e7b4c57d7fd Parents: 0eb6ebd Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Wed Dec 10 22:03:10 2014 -0500 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Wed Dec 10 22:06:35 2014 -0500 ---------------------------------------------------------------------- .../cursor/impl/PageCursorProviderImpl.java | 54 +++++---- .../tests/integration/client/PagingTest.java | 114 +++++++++++++++++++ 2 files changed, 146 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-6/blob/09490cdb/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java ---------------------------------------------------------------------- diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java index 32f30e8..3df0b78 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -425,28 +425,7 @@ public class PageCursorProviderImpl implements PageCursorProvider // on that case we need to move to verify it in a different way if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0) { - boolean complete = true; - - for (PageSubscription cursor : cursorList) - { - if (!cursor.isComplete(minPage)) - { - if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) - { - ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + " was considered incomplete at page " + minPage); - } - - complete = false; - break; - } - else - { - if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) - { - ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + "was considered **complete** at page " + minPage); - } - } - } + boolean complete = checkPageCompletion(cursorList, minPage); if (!pagingStore.isStarted()) { @@ -475,6 +454,10 @@ public class PageCursorProviderImpl implements PageCursorProvider for (long i = pagingStore.getFirstPage(); i < minPage; i++) { + if (!checkPageCompletion(cursorList, i)) + { + break; + } Page page = pagingStore.depage(); if (page == null) { @@ -577,6 +560,33 @@ public class PageCursorProviderImpl implements PageCursorProvider } + + private boolean checkPageCompletion(ArrayList<PageSubscription> cursorList, long minPage) + { + boolean complete = true; + + for (PageSubscription cursor : cursorList) + { + if (!cursor.isComplete(minPage)) + { + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + " was considered incomplete at page " + minPage); + } + + complete = false; + break; + } + else + { + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + "was considered **complete** at page " + minPage); + } + } + } + return complete; + } /** * @return */ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/09490cdb/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java index ff333ab..0e4751d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java @@ -113,6 +113,120 @@ public class PagingTest extends ServiceTestBase } @Test + public void testPageOnLargeMessageMultipleQueues() throws Exception + { + Configuration config = createDefaultConfig(); + + final int PAGE_MAX = 20 * 1024; + + final int PAGE_SIZE = 10 * 1024; + + HashMap<String, AddressSettings> map = new HashMap<String, AddressSettings>(); + + AddressSettings value = new AddressSettings(); + map.put(ADDRESS.toString(), value); + ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map); + server.start(); + + final int numberOfBytes = 1024; + + locator.setBlockOnNonDurableSend(true); + locator.setBlockOnDurableSend(true); + locator.setBlockOnAcknowledge(true); + + ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); + + ClientSession session = sf.createSession(null, null, false, true, true, false, 0); + + session.createQueue(ADDRESS, ADDRESS.concat("-0"), null, true); + session.createQueue(ADDRESS, ADDRESS.concat("-1"), null, true); + + ClientProducer producer = session.createProducer(ADDRESS); + + ClientMessage message = null; + + for (int i = 0; i < 201; i++) + { + message = session.createMessage(true); + + message.getBodyBuffer().writerIndex(0); + + message.getBodyBuffer().writeBytes(new byte[numberOfBytes]); + + for (int j = 1; j <= numberOfBytes; j++) + { + message.getBodyBuffer().writeInt(j); + } + + producer.send(message); + } + + + session.close(); + + server.stop(); + + server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map); + server.start(); + + sf = createSessionFactory(locator); + + for (int ad = 0; ad < 2; ad++) + { + session = sf.createSession(false, false, false); + + ClientConsumer consumer = session.createConsumer(ADDRESS.concat("-" + ad)); + + session.start(); + + for (int i = 0; i < 201; i++) + { + ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME); + + Assert.assertNotNull(message2); + + message2.acknowledge(); + + Assert.assertNotNull(message2); + } + + try + { + if (ad > -1) + { + session.commit(); + } + else + { + session.rollback(); + for (int i = 0; i < 100; i++) + { + ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME); + + Assert.assertNotNull(message2); + + message2.acknowledge(); + + Assert.assertNotNull(message2); + } + session.commit(); + + } + } + catch (Throwable e) + { + System.err.println("here!!!!!!!"); + e.printStackTrace(); + System.exit(-1); + } + + consumer.close(); + + session.close(); + } + } + + @Test public void testPageCleanup() throws Exception { clearDataRecreateServerDirs();