This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 54b0094cd6d8b9f488dedeea60449d4cc58a1ef6
Author: franz1981 <[email protected]>
AuthorDate: Tue Jan 5 11:06:58 2021 +0100

    ARTEMIS-3049 Simplify PageCache API
---
 .../{LivePageCache.java => BulkPageCache.java}     |  4 +--
 .../artemis/core/paging/cursor/LivePageCache.java  |  2 +-
 .../artemis/core/paging/cursor/PageCache.java      | 13 ++------
 .../core/paging/cursor/PageCursorProvider.java     |  2 +-
 .../core/paging/cursor/impl/LivePageCacheImpl.java |  8 -----
 .../core/paging/cursor/impl/PageCacheImpl.java     | 24 +++-----------
 .../paging/cursor/impl/PageCursorProviderImpl.java | 38 ++++++++--------------
 .../core/paging/cursor/impl/PageReader.java        | 30 +++++++----------
 .../artemis/core/paging/impl/PagingStoreImpl.java  |  4 +--
 .../core/paging/cursor/impl/PageReaderTest.java    |  6 ++--
 10 files changed, 41 insertions(+), 90 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/BulkPageCache.java
similarity index 90%
copy from 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java
copy to 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/BulkPageCache.java
index 0d09bc4..98ef786 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/BulkPageCache.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.paging.cursor;
 
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 
-public interface LivePageCache extends PageCache {
+public interface BulkPageCache extends PageCache {
 
-   void addLiveMessage(PagedMessage message);
+    PagedMessage[] getMessages();
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java
index 0d09bc4..01f09a8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.paging.cursor;
 
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 
-public interface LivePageCache extends PageCache {
+public interface LivePageCache extends BulkPageCache {
 
    void addLiveMessage(PagedMessage message);
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java
index 646b568..a79dc94 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java
@@ -19,28 +19,19 @@ package org.apache.activemq.artemis.core.paging.cursor;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.utils.SoftValueLongObjectHashMap;
 
-public interface PageCache extends SoftValueLongObjectHashMap.ValueCache {
+public interface PageCache extends SoftValueLongObjectHashMap.ValueCache, 
AutoCloseable {
 
    long getPageId();
 
    int getNumberOfMessages();
 
-   void setMessages(PagedMessage[] messages);
-
-   PagedMessage[] getMessages();
-
-   /**
-    * @return whether this cache is still being updated
-    */
-   @Override
-   boolean isLive();
-
    /**
     * @param pagePosition page position
     * @return
     */
    PagedMessage getMessage(PagePosition pagePosition);
 
+   @Override
    void close();
 
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
index 29ce8a0..15d78b0 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
@@ -33,7 +33,7 @@ public interface PageCursorProvider {
 
    PagedReference newReference(PagePosition pos, PagedMessage msg, 
PageSubscription sub);
 
-   void addPageCache(PageCache cache);
+   void addLivePageCache(LivePageCache cache);
 
    /**
     * @param queueId The cursorID should be the same as the queueId associated 
for persistence
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
index 4f93e36..260b097 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
@@ -67,14 +67,6 @@ public final class LivePageCacheImpl implements 
LivePageCache {
    }
 
    @Override
-   public void setMessages(PagedMessage[] messages) {
-      // This method shouldn't be called on liveCache, but we will provide the 
implementation for it anyway
-      for (PagedMessage message : messages) {
-         addLiveMessage(message);
-      }
-   }
-
-   @Override
    public PagedMessage getMessage(PagePosition pagePosition) {
       final int messageNr = pagePosition.getMessageNr();
       if (messageNr < 0) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java
index a350ceb..84c0cac 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java
@@ -17,32 +17,23 @@
 package org.apache.activemq.artemis.core.paging.cursor.impl;
 
 import org.apache.activemq.artemis.core.paging.PagedMessage;
-import org.apache.activemq.artemis.core.paging.cursor.PageCache;
+import org.apache.activemq.artemis.core.paging.cursor.BulkPageCache;
 import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
 
 /**
  * The caching associated to a single page.
  */
-class PageCacheImpl implements PageCache {
+class PageCacheImpl implements BulkPageCache {
 
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private PagedMessage[] messages;
+   private final PagedMessage[] messages;
 
    private final long pageId;
 
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   PageCacheImpl(final long pageId) {
+   PageCacheImpl(final long pageId, PagedMessage[] messages) {
       this.pageId = pageId;
+      this.messages = messages;
    }
 
-   // Public --------------------------------------------------------
-
    @Override
    public PagedMessage getMessage(PagePosition pagePosition) {
       if (pagePosition.getMessageNr() < messages.length) {
@@ -58,11 +49,6 @@ class PageCacheImpl implements PageCache {
    }
 
    @Override
-   public void setMessages(final PagedMessage[] messages) {
-      this.messages = messages;
-   }
-
-   @Override
    public int getNumberOfMessages() {
       return messages.length;
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 7ace24b..9acffb3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -28,8 +28,10 @@ import io.netty.util.collection.LongObjectHashMap;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
 import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
 import org.apache.activemq.artemis.core.paging.cursor.PageCache;
+import org.apache.activemq.artemis.core.paging.cursor.BulkPageCache;
 import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
 import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
@@ -72,11 +74,11 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
    // This is the same executor used at the PageStoreImpl. One Executor per 
pageStore
    private final ArtemisExecutor executor;
 
-   private final SoftValueLongObjectHashMap<PageCache> softCache;
+   private final SoftValueLongObjectHashMap<BulkPageCache> softCache;
 
    private LongObjectHashMap<Integer> numberOfMessages = null;
 
-   private final LongObjectHashMap<CompletableFuture<PageCache>> 
inProgressReadPages;
+   private final LongObjectHashMap<CompletableFuture<BulkPageCache>> 
inProgressReadPages;
 
    private final ConcurrentLongHashMap<PageSubscription> activeCursors = new 
ConcurrentLongHashMap<>();
 
@@ -162,8 +164,8 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
             return null;
          }
          boolean createPage = false;
-         CompletableFuture<PageCache> inProgressReadPage;
-         PageCache cache;
+         CompletableFuture<BulkPageCache> inProgressReadPage;
+         BulkPageCache cache;
          Page page = null;
          synchronized (softCache) {
             cache = softCache.get(pageId);
@@ -184,8 +186,7 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
                if (numberOfMessages != null && 
numberOfMessages.containsKey(pageId)) {
                   return new PageReader(pagingStore.createPage((int) pageId), 
numberOfMessages.get(pageId));
                }
-               final CompletableFuture<PageCache> readPage = new 
CompletableFuture<>();
-               cache = createPageCache(pageId);
+               final CompletableFuture<BulkPageCache> readPage = new 
CompletableFuture<>();
                page = pagingStore.createPage((int) pageId);
                createPage = true;
                inProgressReadPage = readPage;
@@ -193,7 +194,7 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
             }
          }
          if (createPage) {
-            return readPage(pageId, page, cache, inProgressReadPage);
+            return readPage(pageId, page, inProgressReadPage);
          } else {
             final long startedWait = System.nanoTime();
             while (true) {
@@ -214,11 +215,11 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
 
    private PageCache readPage(long pageId,
                               Page page,
-                              PageCache cache,
-                              CompletableFuture<PageCache> inProgressReadPage) 
throws Exception {
+                              CompletableFuture<BulkPageCache> 
inProgressReadPage) throws Exception {
       logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, 
this.pagingStore.getAddress());
       boolean acquiredPageReadPermission = false;
       int num = -1;
+      final PageCacheImpl cache;
       try {
          final long startedRequest = System.nanoTime();
          while (!acquiredPageReadPermission) {
@@ -238,7 +239,7 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
                          pagingStore.getAddress(), 
TimeUnit.NANOSECONDS.toMillis(elapsedReadPage), page.getSize());
          }
          num = pgdMessages.size();
-         cache.setMessages(pgdMessages.toArray(new PagedMessage[num]));
+         cache = new PageCacheImpl(pageId, pgdMessages.toArray(new 
PagedMessage[num]));
       } catch (Throwable t) {
          inProgressReadPage.completeExceptionally(t);
          synchronized (softCache) {
@@ -268,8 +269,8 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
    }
 
    @Override
-   public void addPageCache(PageCache cache) {
-      logger.tracef("Add page cache %s", cache);
+   public void addLivePageCache(LivePageCache cache) {
+      logger.tracef("Add live page cache %s", cache);
       synchronized (softCache) {
          softCache.put(cache.getPageId(), cache);
       }
@@ -537,7 +538,7 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
       logger.tracef("this(%s) finishing cleanup on %s", this, depagedPages);
       try {
          for (Page depagedPage : depagedPages) {
-            PageCache cache;
+            BulkPageCache cache;
             PagedMessage[] pgdMessages;
             synchronized (softCache) {
                cache = softCache.get((long) depagedPage.getPageId());
@@ -659,17 +660,6 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
          '}';
    }
 
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   /* Protected as we may let test cases to instrument the test */
-   protected PageCacheImpl createPageCache(final long pageId) {
-      return new PageCacheImpl(pageId);
-   }
-
-   // Private -------------------------------------------------------
-
    /**
     * This method is synchronized because we want it to be atomic with the 
cursors being used
     */
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java
index f518f75..83ad21e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java
@@ -29,7 +29,6 @@ public class PageReader implements PageCache {
 
    private final Page page;
    private final int numberOfMessages;
-   private PagedMessage[] pagedMessages = null;
 
    public PageReader(Page page, int numberOfMessages) {
       this.page = page;
@@ -46,24 +45,17 @@ public class PageReader implements PageCache {
       return numberOfMessages;
    }
 
-   @Override
-   public void setMessages(PagedMessage[] messages) {
-      this.pagedMessages = messages;
-   }
-
-   @Override
-   public synchronized PagedMessage[] getMessages() {
-      if (pagedMessages != null) {
-         return pagedMessages;
-      } else {
-         try {
-            openPage();
-            return page.read().toArray(new PagedMessage[numberOfMessages]);
-         } catch (Exception e) {
-            throw new RuntimeException(e.getMessage(), e);
-         } finally {
-            close();
-         }
+   /**
+    * Used just for testing purposes.
+    */
+   protected synchronized PagedMessage[] readMessages() {
+      try {
+         openPage();
+         return page.read().toArray(new PagedMessage[numberOfMessages]);
+      } catch (Exception e) {
+         throw new RuntimeException(e.getMessage(), e);
+      } finally {
+         close();
       }
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 9fd5a96..6884486 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -490,7 +490,7 @@ public class PagingStoreImpl implements PagingStore {
 
       currentPage = page;
 
-      cursorProvider.addPageCache(pageCache);
+      cursorProvider.addLivePageCache(pageCache);
 
       /**
        * The page file might be incomplete in the cases: 1) last message 
incomplete 2) disk damaged.
@@ -1120,7 +1120,7 @@ public class PagingStoreImpl implements PagingStore {
 
          newPage.setLiveCache(pageCache);
 
-         cursorProvider.addPageCache(pageCache);
+         cursorProvider.addLivePageCache(pageCache);
 
          currentPageSize = 0;
 
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java
index 2ad18e5..eba884f 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java
@@ -50,7 +50,7 @@ public class PageReaderTest extends ActiveMQTestBase {
       int[] offsets = createPage(num);
       PageReader pageReader = getPageReader();
 
-      PagedMessage[] pagedMessages = pageReader.getMessages();
+      PagedMessage[] pagedMessages = pageReader.readMessages();
       assertEquals(pagedMessages.length, num);
 
       PagedMessage pagedMessage = null;
@@ -80,7 +80,7 @@ public class PageReaderTest extends ActiveMQTestBase {
       int[] offsets = createPage(num);
       PageReader pageReader = getPageReader();
 
-      PagedMessage[] pagedMessages = pageReader.getMessages();
+      PagedMessage[] pagedMessages = pageReader.readMessages();
       assertEquals(pagedMessages.length, num);
 
       PagePosition pagePosition = new PagePositionImpl(10, 0);
@@ -108,7 +108,7 @@ public class PageReaderTest extends ActiveMQTestBase {
          int[] offsets = createPage(num);
          PageReader pageReader = getPageReader();
 
-         PagedMessage[] pagedMessages = pageReader.getMessages();
+         PagedMessage[] pagedMessages = pageReader.readMessages();
          assertEquals(pagedMessages.length, num);
 
          PagePosition pagePosition = new PagePositionImpl(10, 0, 50);

Reply via email to