Repository: activemq-6
Updated Branches:
  refs/heads/master 0eb6ebda2 -> 1491f4a12


ACTIVEMQ6-54 Fixing tests broken after Paging fix

https://issues.apache.org/jira/browse/ACTIVEMQ6-54

Changing the order of depaging introduced an extra check that needs to be 
checked now.
This will probably take care of the issue by checking if the page is complete 
before depage.


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/09490cdb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/09490cdb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/09490cdb

Branch: refs/heads/master
Commit: 09490cdba3278f53980a906f35893e7b4c57d7fd
Parents: 0eb6ebd
Author: Clebert Suconic <clebertsuco...@apache.org>
Authored: Wed Dec 10 22:03:10 2014 -0500
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Wed Dec 10 22:06:35 2014 -0500

----------------------------------------------------------------------
 .../cursor/impl/PageCursorProviderImpl.java     |  54 +++++----
 .../tests/integration/client/PagingTest.java    | 114 +++++++++++++++++++
 2 files changed, 146 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/09490cdb/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java
 
b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java
index 32f30e8..3df0b78 100644
--- 
a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ 
b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -425,28 +425,7 @@ public class PageCursorProviderImpl implements 
PageCursorProvider
             // on that case we need to move to verify it in a different way
             if (minPage == pagingStore.getCurrentWritingPage() && 
pagingStore.getCurrentPage().getNumberOfMessages() > 0)
             {
-               boolean complete = true;
-
-               for (PageSubscription cursor : cursorList)
-               {
-                  if (!cursor.isComplete(minPage))
-                  {
-                     if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
-                     {
-                        ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + 
" was considered incomplete at page " + minPage);
-                     }
-
-                     complete = false;
-                     break;
-                  }
-                  else
-                  {
-                     if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
-                     {
-                        ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + 
"was considered **complete** at page " + minPage);
-                     }
-                  }
-               }
+               boolean complete = checkPageCompletion(cursorList, minPage);
 
                if (!pagingStore.isStarted())
                {
@@ -475,6 +454,10 @@ public class PageCursorProviderImpl implements 
PageCursorProvider
 
             for (long i = pagingStore.getFirstPage(); i < minPage; i++)
             {
+               if (!checkPageCompletion(cursorList, i))
+               {
+                  break;
+               }
                Page page = pagingStore.depage();
                if (page == null)
                {
@@ -577,6 +560,33 @@ public class PageCursorProviderImpl implements 
PageCursorProvider
 
    }
 
+
+   private boolean checkPageCompletion(ArrayList<PageSubscription> cursorList, 
long minPage)
+   {
+      boolean complete = true;
+
+      for (PageSubscription cursor : cursorList)
+      {
+         if (!cursor.isComplete(minPage))
+         {
+            if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
+            {
+               ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + " was 
considered incomplete at page " + minPage);
+            }
+
+            complete = false;
+            break;
+         }
+         else
+         {
+            if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
+            {
+               ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + "was 
considered **complete** at page " + minPage);
+            }
+         }
+      }
+      return complete;
+   }
    /**
     * @return
     */

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/09490cdb/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
index ff333ab..0e4751d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
@@ -113,6 +113,120 @@ public class PagingTest extends ServiceTestBase
    }
 
    @Test
+   public void testPageOnLargeMessageMultipleQueues() throws Exception
+   {
+      Configuration config = createDefaultConfig();
+
+      final int PAGE_MAX = 20 * 1024;
+
+      final int PAGE_SIZE = 10 * 1024;
+
+      HashMap<String, AddressSettings> map = new HashMap<String, 
AddressSettings>();
+
+      AddressSettings value = new AddressSettings();
+      map.put(ADDRESS.toString(), value);
+      ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, 
map);
+      server.start();
+
+      final int numberOfBytes = 1024;
+
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(true);
+
+      ClientSessionFactory sf = 
addSessionFactory(createSessionFactory(locator));
+
+      ClientSession session = sf.createSession(null, null, false, true, true, 
false, 0);
+
+      session.createQueue(ADDRESS, ADDRESS.concat("-0"), null, true);
+      session.createQueue(ADDRESS, ADDRESS.concat("-1"), null, true);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      ClientMessage message = null;
+
+      for (int i = 0; i < 201; i++)
+      {
+         message = session.createMessage(true);
+
+         message.getBodyBuffer().writerIndex(0);
+
+         message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
+
+         for (int j = 1; j <= numberOfBytes; j++)
+         {
+            message.getBodyBuffer().writeInt(j);
+         }
+
+         producer.send(message);
+      }
+
+
+      session.close();
+
+      server.stop();
+
+      server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+      server.start();
+
+      sf = createSessionFactory(locator);
+
+      for (int ad = 0; ad < 2; ad++)
+      {
+         session = sf.createSession(false, false, false);
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS.concat("-" + 
ad));
+
+         session.start();
+
+         for (int i = 0; i < 201; i++)
+         {
+            ClientMessage message2 = 
consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+
+            Assert.assertNotNull(message2);
+
+            message2.acknowledge();
+
+            Assert.assertNotNull(message2);
+         }
+
+         try
+         {
+            if (ad > -1)
+            {
+               session.commit();
+            }
+            else
+            {
+               session.rollback();
+               for (int i = 0; i < 100; i++)
+               {
+                  ClientMessage message2 = 
consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+
+                  Assert.assertNotNull(message2);
+
+                  message2.acknowledge();
+
+                  Assert.assertNotNull(message2);
+               }
+               session.commit();
+
+            }
+         }
+         catch (Throwable e)
+         {
+            System.err.println("here!!!!!!!");
+            e.printStackTrace();
+            System.exit(-1);
+         }
+
+         consumer.close();
+
+         session.close();
+      }
+   }
+
+   @Test
    public void testPageCleanup() throws Exception
    {
       clearDataRecreateServerDirs();

Reply via email to