This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new b92cde1  ARTEMIS-2430 Avoid data loss when live page cache evicted
     new 8963cd9  This closes #2766
b92cde1 is described below

commit b92cde165b0ce822972eee723a950653c3ee5ff4
Author: Wei Yang <[email protected]>
AuthorDate: Wed Jul 24 19:42:49 2019 +0800

    ARTEMIS-2430 Avoid data loss when live page cache evicted
---
 .../paging/cursor/impl/PageCursorProviderImpl.java |  6 +++
 .../activemq/artemis/core/paging/impl/Page.java    |  4 ++
 .../tests/integration/paging/PagingTest.java       | 63 ++++++++++++++++++++++
 3 files changed, 73 insertions(+)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 4e03b3b..3abd886 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -161,6 +161,12 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
             if (!pagingStore.checkPageFileExists((int) pageId)) {
                return null;
             }
+            Page currentPage = pagingStore.getCurrentPage();
+            // Live page cache might be cleared by gc, we need to retrieve it 
otherwise partially written page cache is being returned
+            if (currentPage != null && currentPage.getPageId() == pageId && 
(cache = currentPage.getLiveCache()) != null) {
+               softCache.put(cache.getPageId(), cache);
+               return cache;
+            }
             inProgressReadPage = inProgressReadPages.get(pageId);
             if (inProgressReadPage == null) {
                final CompletableFuture<PageCache> readPage = new 
CompletableFuture<>();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 5f98b4c..fb236f8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -100,6 +100,10 @@ public final class Page implements Comparable<Page> {
       this.pageCache = pageCache;
    }
 
+   public LivePageCache getLiveCache() {
+      return pageCache;
+   }
+
    public synchronized List<PagedMessage> read(StorageManager storage) throws 
Exception {
       if (logger.isDebugEnabled()) {
          logger.debug("reading page " + this.pageId + " on address = " + 
storeName);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 5dd5cc2..19eee08 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -6586,6 +6586,69 @@ public class PagingTest extends ActiveMQTestBase {
       server.stop();
    }
 
+   // We send messages to page, evict live page cache, send last message when 
mid consumed, and expect to receive all messages
+   @Test
+   public void testLivePageCacheEvicted() throws Throwable {
+      clearDataRecreateServerDirs();
+
+      Configuration config = 
createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+      server = createServer(true, config, PagingTest.PAGE_SIZE, 
PagingTest.PAGE_MAX);
+      server.start();
+
+      try {
+         ServerLocator locator = 
createInVMNonHALocator().setBlockOnDurableSend(true);
+         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSession session = sf.createSession(true, true, 0);
+
+         session.createQueue(ADDRESS, ADDRESS, null, true);
+
+         PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+         store.startPaging();
+
+         ClientProducer prod = session.createProducer(ADDRESS);
+         int num = 10;
+         for (int i = 0; i < num; i++) {
+            ClientMessage msg = session.createMessage(true);
+            msg.putIntProperty("index", i);
+            prod.send(msg);
+         }
+
+         session.start();
+         ClientConsumer cons = session.createConsumer(ADDRESS);
+         ClientMessage msgReceivedCons = null;
+         // simulate the live page cache evicted
+         store.getCursorProvider().clearCache();
+         for (int i = 0; i < num; i++) {
+            msgReceivedCons = cons.receive(1000);
+            assertNotNull(msgReceivedCons);
+            assertTrue(msgReceivedCons.getIntProperty("index") == i);
+            msgReceivedCons.acknowledge();
+
+            session.commit();
+
+            if (i == num / 2) {
+               ClientMessage msg = session.createMessage(true);
+               msg.putIntProperty("index", num);
+               prod.send(msg);
+            }
+         }
+
+         msgReceivedCons = cons.receive(1000);
+         assertNotNull(msgReceivedCons);
+         assertTrue(msgReceivedCons.getIntProperty("index") == num);
+         msgReceivedCons.acknowledge();
+
+         assertNull(cons.receiveImmediate());
+
+         session.commit();
+         session.close();
+
+         waitForNotPaging(store);
+      } finally {
+         server.stop();
+      }
+   }
+
    @Override
    protected Configuration createDefaultConfig(final int serverID, final 
boolean netty) throws Exception {
       Configuration configuration = super.createDefaultConfig(serverID, netty);

Reply via email to