This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 54b0094cd6d8b9f488dedeea60449d4cc58a1ef6 Author: franz1981 <[email protected]> AuthorDate: Tue Jan 5 11:06:58 2021 +0100 ARTEMIS-3049 Simplify PageCache API --- .../{LivePageCache.java => BulkPageCache.java} | 4 +-- .../artemis/core/paging/cursor/LivePageCache.java | 2 +- .../artemis/core/paging/cursor/PageCache.java | 13 ++------ .../core/paging/cursor/PageCursorProvider.java | 2 +- .../core/paging/cursor/impl/LivePageCacheImpl.java | 8 ----- .../core/paging/cursor/impl/PageCacheImpl.java | 24 +++----------- .../paging/cursor/impl/PageCursorProviderImpl.java | 38 ++++++++-------------- .../core/paging/cursor/impl/PageReader.java | 30 +++++++---------- .../artemis/core/paging/impl/PagingStoreImpl.java | 4 +-- .../core/paging/cursor/impl/PageReaderTest.java | 6 ++-- 10 files changed, 41 insertions(+), 90 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/BulkPageCache.java similarity index 90% copy from artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java copy to artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/BulkPageCache.java index 0d09bc4..98ef786 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/BulkPageCache.java @@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.paging.cursor; import org.apache.activemq.artemis.core.paging.PagedMessage; -public interface LivePageCache extends PageCache { +public interface BulkPageCache extends PageCache { - void addLiveMessage(PagedMessage message); + PagedMessage[] getMessages(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java index 0d09bc4..01f09a8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/LivePageCache.java @@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.paging.cursor; import org.apache.activemq.artemis.core.paging.PagedMessage; -public interface LivePageCache extends PageCache { +public interface LivePageCache extends BulkPageCache { void addLiveMessage(PagedMessage message); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java index 646b568..a79dc94 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java @@ -19,28 +19,19 @@ package org.apache.activemq.artemis.core.paging.cursor; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.utils.SoftValueLongObjectHashMap; -public interface PageCache extends SoftValueLongObjectHashMap.ValueCache { +public interface PageCache extends SoftValueLongObjectHashMap.ValueCache, AutoCloseable { long getPageId(); int getNumberOfMessages(); - void setMessages(PagedMessage[] messages); - - PagedMessage[] getMessages(); - - /** - * @return whether this cache is still being updated - */ - @Override - boolean isLive(); - /** * @param pagePosition page position * @return */ PagedMessage getMessage(PagePosition pagePosition); + @Override void close(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java index 29ce8a0..15d78b0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java @@ -33,7 +33,7 @@ public interface PageCursorProvider { PagedReference newReference(PagePosition pos, PagedMessage msg, PageSubscription sub); - void addPageCache(PageCache cache); + void addLivePageCache(LivePageCache cache); /** * @param queueId The cursorID should be the same as the queueId associated for persistence diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java index 4f93e36..260b097 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java @@ -67,14 +67,6 @@ public final class LivePageCacheImpl implements LivePageCache { } @Override - public void setMessages(PagedMessage[] messages) { - // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway - for (PagedMessage message : messages) { - addLiveMessage(message); - } - } - - @Override public PagedMessage getMessage(PagePosition pagePosition) { final int messageNr = pagePosition.getMessageNr(); if (messageNr < 0) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java index a350ceb..84c0cac 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java @@ -17,32 +17,23 @@ package org.apache.activemq.artemis.core.paging.cursor.impl; import org.apache.activemq.artemis.core.paging.PagedMessage; -import org.apache.activemq.artemis.core.paging.cursor.PageCache; +import org.apache.activemq.artemis.core.paging.cursor.BulkPageCache; import org.apache.activemq.artemis.core.paging.cursor.PagePosition; /** * The caching associated to a single page. */ -class PageCacheImpl implements PageCache { +class PageCacheImpl implements BulkPageCache { - // Constants ----------------------------------------------------- - - // Attributes ---------------------------------------------------- - - private PagedMessage[] messages; + private final PagedMessage[] messages; private final long pageId; - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - PageCacheImpl(final long pageId) { + PageCacheImpl(final long pageId, PagedMessage[] messages) { this.pageId = pageId; + this.messages = messages; } - // Public -------------------------------------------------------- - @Override public PagedMessage getMessage(PagePosition pagePosition) { if (pagePosition.getMessageNr() < messages.length) { @@ -58,11 +49,6 @@ class PageCacheImpl implements PageCache { } @Override - public void setMessages(final PagedMessage[] messages) { - this.messages = messages; - } - - @Override public int getNumberOfMessages() { return messages.length; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index 7ace24b..9acffb3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -28,8 +28,10 @@ import io.netty.util.collection.LongObjectHashMap; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.paging.cursor.LivePageCache; import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage; import org.apache.activemq.artemis.core.paging.cursor.PageCache; +import org.apache.activemq.artemis.core.paging.cursor.BulkPageCache; import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; @@ -72,11 +74,11 @@ public class PageCursorProviderImpl implements PageCursorProvider { // This is the same executor used at the PageStoreImpl. One Executor per pageStore private final ArtemisExecutor executor; - private final SoftValueLongObjectHashMap<PageCache> softCache; + private final SoftValueLongObjectHashMap<BulkPageCache> softCache; private LongObjectHashMap<Integer> numberOfMessages = null; - private final LongObjectHashMap<CompletableFuture<PageCache>> inProgressReadPages; + private final LongObjectHashMap<CompletableFuture<BulkPageCache>> inProgressReadPages; private final ConcurrentLongHashMap<PageSubscription> activeCursors = new ConcurrentLongHashMap<>(); @@ -162,8 +164,8 @@ public class PageCursorProviderImpl implements PageCursorProvider { return null; } boolean createPage = false; - CompletableFuture<PageCache> inProgressReadPage; - PageCache cache; + CompletableFuture<BulkPageCache> inProgressReadPage; + BulkPageCache cache; Page page = null; synchronized (softCache) { cache = softCache.get(pageId); @@ -184,8 +186,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { if (numberOfMessages != null && numberOfMessages.containsKey(pageId)) { return new PageReader(pagingStore.createPage((int) pageId), numberOfMessages.get(pageId)); } - final CompletableFuture<PageCache> readPage = new CompletableFuture<>(); - cache = createPageCache(pageId); + final CompletableFuture<BulkPageCache> readPage = new CompletableFuture<>(); page = pagingStore.createPage((int) pageId); createPage = true; inProgressReadPage = readPage; @@ -193,7 +194,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { } } if (createPage) { - return readPage(pageId, page, cache, inProgressReadPage); + return readPage(pageId, page, inProgressReadPage); } else { final long startedWait = System.nanoTime(); while (true) { @@ -214,11 +215,11 @@ public class PageCursorProviderImpl implements PageCursorProvider { private PageCache readPage(long pageId, Page page, - PageCache cache, - CompletableFuture<PageCache> inProgressReadPage) throws Exception { + CompletableFuture<BulkPageCache> inProgressReadPage) throws Exception { logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, this.pagingStore.getAddress()); boolean acquiredPageReadPermission = false; int num = -1; + final PageCacheImpl cache; try { final long startedRequest = System.nanoTime(); while (!acquiredPageReadPermission) { @@ -238,7 +239,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { pagingStore.getAddress(), TimeUnit.NANOSECONDS.toMillis(elapsedReadPage), page.getSize()); } num = pgdMessages.size(); - cache.setMessages(pgdMessages.toArray(new PagedMessage[num])); + cache = new PageCacheImpl(pageId, pgdMessages.toArray(new PagedMessage[num])); } catch (Throwable t) { inProgressReadPage.completeExceptionally(t); synchronized (softCache) { @@ -268,8 +269,8 @@ public class PageCursorProviderImpl implements PageCursorProvider { } @Override - public void addPageCache(PageCache cache) { - logger.tracef("Add page cache %s", cache); + public void addLivePageCache(LivePageCache cache) { + logger.tracef("Add live page cache %s", cache); synchronized (softCache) { softCache.put(cache.getPageId(), cache); } @@ -537,7 +538,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { logger.tracef("this(%s) finishing cleanup on %s", this, depagedPages); try { for (Page depagedPage : depagedPages) { - PageCache cache; + BulkPageCache cache; PagedMessage[] pgdMessages; synchronized (softCache) { cache = softCache.get((long) depagedPage.getPageId()); @@ -659,17 +660,6 @@ public class PageCursorProviderImpl implements PageCursorProvider { '}'; } - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - /* Protected as we may let test cases to instrument the test */ - protected PageCacheImpl createPageCache(final long pageId) { - return new PageCacheImpl(pageId); - } - - // Private ------------------------------------------------------- - /** * This method is synchronized because we want it to be atomic with the cursors being used */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java index f518f75..83ad21e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReader.java @@ -29,7 +29,6 @@ public class PageReader implements PageCache { private final Page page; private final int numberOfMessages; - private PagedMessage[] pagedMessages = null; public PageReader(Page page, int numberOfMessages) { this.page = page; @@ -46,24 +45,17 @@ public class PageReader implements PageCache { return numberOfMessages; } - @Override - public void setMessages(PagedMessage[] messages) { - this.pagedMessages = messages; - } - - @Override - public synchronized PagedMessage[] getMessages() { - if (pagedMessages != null) { - return pagedMessages; - } else { - try { - openPage(); - return page.read().toArray(new PagedMessage[numberOfMessages]); - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } finally { - close(); - } + /** + * Used just for testing purposes. + */ + protected synchronized PagedMessage[] readMessages() { + try { + openPage(); + return page.read().toArray(new PagedMessage[numberOfMessages]); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } finally { + close(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 9fd5a96..6884486 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -490,7 +490,7 @@ public class PagingStoreImpl implements PagingStore { currentPage = page; - cursorProvider.addPageCache(pageCache); + cursorProvider.addLivePageCache(pageCache); /** * The page file might be incomplete in the cases: 1) last message incomplete 2) disk damaged. @@ -1120,7 +1120,7 @@ public class PagingStoreImpl implements PagingStore { newPage.setLiveCache(pageCache); - cursorProvider.addPageCache(pageCache); + cursorProvider.addLivePageCache(pageCache); currentPageSize = 0; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java index 2ad18e5..eba884f 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageReaderTest.java @@ -50,7 +50,7 @@ public class PageReaderTest extends ActiveMQTestBase { int[] offsets = createPage(num); PageReader pageReader = getPageReader(); - PagedMessage[] pagedMessages = pageReader.getMessages(); + PagedMessage[] pagedMessages = pageReader.readMessages(); assertEquals(pagedMessages.length, num); PagedMessage pagedMessage = null; @@ -80,7 +80,7 @@ public class PageReaderTest extends ActiveMQTestBase { int[] offsets = createPage(num); PageReader pageReader = getPageReader(); - PagedMessage[] pagedMessages = pageReader.getMessages(); + PagedMessage[] pagedMessages = pageReader.readMessages(); assertEquals(pagedMessages.length, num); PagePosition pagePosition = new PagePositionImpl(10, 0); @@ -108,7 +108,7 @@ public class PageReaderTest extends ActiveMQTestBase { int[] offsets = createPage(num); PageReader pageReader = getPageReader(); - PagedMessage[] pagedMessages = pageReader.getMessages(); + PagedMessage[] pagedMessages = pageReader.readMessages(); assertEquals(pagedMessages.length, num); PagePosition pagePosition = new PagePositionImpl(10, 0, 50);
