This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 19b04531c60bdc5f8ad3e1950f131a75c19d3fbf
Author: franz1981 <[email protected]>
AuthorDate: Mon Jan 4 15:10:49 2021 +0100

    ARTEMIS-3049 Reduce live page lookup cost
---
 .../ConcurrentAppendOnlyChunkedList.java           | 46 +++++++++----------
 .../ConcurrentAppendOnlyChunkedListTest.java       | 53 ++++++++++++++++++++--
 .../core/paging/cursor/impl/LivePageCacheImpl.java | 40 +++++++++++++---
 .../artemis/core/paging/impl/PagingStoreImpl.java  | 11 ++---
 4 files changed, 108 insertions(+), 42 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
index c288e16..4a91fe0 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
@@ -42,8 +42,6 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
 
    private static final 
AtomicLongFieldUpdater<ConcurrentAppendOnlyChunkedList> LAST_INDEX_UPDATER = 
AtomicLongFieldUpdater.newUpdater(ConcurrentAppendOnlyChunkedList.class, 
"lastIndex");
 
-   private static final 
AtomicLongFieldUpdater<ConcurrentAppendOnlyChunkedList> 
CACHED_LAST_INDEX_UPDATER = 
AtomicLongFieldUpdater.newUpdater(ConcurrentAppendOnlyChunkedList.class, 
"cachedLastIndex");
-
    private final int chunkSize;
 
    private final int chunkMask;
@@ -58,10 +56,6 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
    //it's using a parity bit to mark the rotation state ie size === lastIndex 
>> 1
    private volatile long lastIndex = 0;
 
-   //cached view of lastIndex used to avoid invalidating lastIndex while being 
updated by the appends
-
-   private volatile long cachedLastIndex = 0;
-
    /**
     * @throws IllegalArgumentException if {@code chunkSize} is &lt;0 or not a 
power of 2
     */
@@ -105,16 +99,10 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
       if (index < 0) {
          return null;
       }
-      //it allow to perform less cache invalidations vs lastIndex if there are 
bursts of appends
-      long lastIndex = cachedLastIndex;
+      final long lastIndex = getValidLastIndex();
+      //it is a element over the current size?
       if (index >= lastIndex) {
-         lastIndex = getValidLastIndex();
-         //it is a element over the current size?
-         if (index >= lastIndex) {
-            return null;
-         }
-         //publish it for others readers
-         CACHED_LAST_INDEX_UPDATER.lazySet(this, lastIndex);
+         return null;
       }
       final AtomicChunk<T> buffer;
       final int offset;
@@ -139,20 +127,20 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
       final int chunkIndex = index >> chunkSizeLog2;
       //size is never allowed to be > Integer.MAX_VALUE
       final int lastChunkIndex = (int) lastIndex >> chunkSizeLog2;
-      int chunkIndexes = chunkIndex;
+      int distance = chunkIndex;
       AtomicChunk<T> buffer = null;
       boolean forward = true;
-      int distanceFromLastChunkIndex = lastChunkIndex - chunkIndex;
+      int distanceFromLast = lastChunkIndex - chunkIndex;
       //it's worth to go backward from lastChunkIndex?
       //trying first to check against the value we already have: if it won't 
worth, won't make sense to load the lastBuffer
-      if (distanceFromLastChunkIndex < chunkIndex) {
+      if (distanceFromLast < distance) {
          final AtomicChunk<T> lastBuffer = this.lastBuffer;
          //lastBuffer is a potential moving, always increasing, target ie 
better to re-check the distance
-         distanceFromLastChunkIndex = lastBuffer.index - chunkIndex;
-         if (distanceFromLastChunkIndex < chunkIndex) {
+         distanceFromLast = lastBuffer.index - chunkIndex;
+         if (distanceFromLast < distance) {
             //we're saving some jumps ie is fine to go backward from here
             buffer = lastBuffer;
-            chunkIndexes = distanceFromLastChunkIndex;
+            distance = distanceFromLast;
             forward = false;
          }
       }
@@ -160,7 +148,7 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
       if (buffer == null) {
          buffer = firstBuffer;
       }
-      for (int i = 0; i < chunkIndexes; i++) {
+      for (int i = 0; i < distance; i++) {
          //next chunk is always set if below a read lastIndex value
          //previous chunk is final and can be safely read
          buffer = forward ? buffer.next : buffer.prev;
@@ -234,21 +222,31 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
       return true;
    }
 
+   public T[] toArray(IntFunction<T[]> arrayAllocator) {
+      return toArray(arrayAllocator, 0);
+   }
+
    /**
     * Returns an array containing all of the elements in this collection in 
proper
     * sequence (from first to last element).<br>
     * {@code arrayAllocator} will be used to instantiate the array of the 
correct size with the right runtime type.
     */
-   public T[] toArray(IntFunction<T[]> arrayAllocator) {
+   public T[] toArray(IntFunction<T[]> arrayAllocator, int startIndex) {
+      if (startIndex < 0) {
+         throw new ArrayIndexOutOfBoundsException("startIndex must be >= 0");
+      }
       final long lastIndex = getValidLastIndex();
       assert lastIndex <= Integer.MAX_VALUE;
       final int size = (int) lastIndex;
       final T[] elements = arrayAllocator.apply(size);
+      if (startIndex + size > elements.length) {
+         throw new ArrayIndexOutOfBoundsException();
+      }
       //fast division by a power of 2
       final int chunkSize = this.chunkSize;
       final int chunks = size > chunkSize ? size >> chunkSizeLog2 : 0;
       AtomicChunk<T> buffer = firstBuffer;
-      int elementIndex = 0;
+      int elementIndex = startIndex;
       for (int i = 0; i < chunks; i++) {
          drain(buffer, elements, elementIndex, chunkSize);
          elementIndex += chunkSize;
diff --git 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java
 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java
index 922c780..2803d60 100644
--- 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java
+++ 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java
@@ -20,13 +20,15 @@
  */
 package org.apache.activemq.artemis.utils.collections;
 
+import java.util.Arrays;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 public class ConcurrentAppendOnlyChunkedListTest {
 
-   private static final int CHUNK_SIZE = 32;
-   private static final int ELEMENTS = (CHUNK_SIZE * 3) + 1;
+   private static final int CHUNK_SIZE = 16;
+   private static final int ELEMENTS = (CHUNK_SIZE * 4) + 1;
 
    private final ConcurrentAppendOnlyChunkedList<Integer> chunkedList;
 
@@ -101,7 +103,12 @@ public class ConcurrentAppendOnlyChunkedListTest {
       for (int i = 0; i < messages; i++) {
          cachedElements[i] = chunkedList.get(i);
       }
-      Assert.assertArrayEquals(cachedElements, elements);
+      Assert.assertArrayEquals(elements, cachedElements);
+      Arrays.fill(cachedElements, null);
+      for (int i = messages - 1; i >= 0; i--) {
+         cachedElements[i] = chunkedList.get(i);
+      }
+      Assert.assertArrayEquals(elements, cachedElements);
    }
 
    @Test
@@ -117,7 +124,7 @@ public class ConcurrentAppendOnlyChunkedListTest {
       for (int i = 0; i < messages; i++) {
          cachedElements[i] = chunkedList.get(i);
       }
-      Assert.assertArrayEquals(cachedElements, elements);
+      Assert.assertArrayEquals(elements, cachedElements);
    }
 
    @Test
@@ -134,6 +141,44 @@ public class ConcurrentAppendOnlyChunkedListTest {
    }
 
    @Test
+   public void shouldToArrayWithIndexReturnElementsAccordingToAddOrder() {
+      final int messages = ELEMENTS;
+      final Integer[] elements = new Integer[messages];
+      for (int i = 0; i < messages; i++) {
+         final Integer element = i;
+         elements[i] = element;
+         chunkedList.add(element);
+      }
+      final int offset = 10;
+      final Integer[] cachedElements = chunkedList.toArray(size -> new 
Integer[offset + size], offset);
+      Assert.assertArrayEquals(elements, Arrays.copyOfRange(cachedElements, 
offset, cachedElements.length));
+      Assert.assertArrayEquals(new Integer[offset], 
Arrays.copyOfRange(cachedElements, 0, offset));
+   }
+
+   @Test(expected = ArrayIndexOutOfBoundsException.class)
+   public void shouldFailToArrayWithInsufficientArrayCapacity() {
+      final int messages = ELEMENTS;
+      final Integer[] elements = new Integer[messages];
+      for (int i = 0; i < messages; i++) {
+         final Integer element = i;
+         elements[i] = element;
+         chunkedList.add(element);
+      }
+      final int offset = 10;
+      chunkedList.toArray(size -> new Integer[offset + size - 1], offset);
+   }
+
+   @Test(expected = ArrayIndexOutOfBoundsException.class)
+   public void shouldFailToArrayWithNegativeStartIndex() {
+      chunkedList.toArray(Integer[]::new, -1);
+   }
+
+   @Test(expected = NullPointerException.class)
+   public void shouldFailToArrayWithNullArray() {
+      chunkedList.toArray(size -> null);
+   }
+
+   @Test
    public void shouldToArrayReturnElementsAccordingToAddAllOrder() {
       final int messages = ELEMENTS;
       final Integer[] elements = new Integer[messages];
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
index 6fb42c1..4f93e36 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
@@ -31,7 +31,9 @@ public final class LivePageCacheImpl implements LivePageCache 
{
 
    private static final int CHUNK_SIZE = 32;
 
-   private final ConcurrentAppendOnlyChunkedList<PagedMessage> messages;
+   private final PagedMessage[] initialMessages;
+
+   private final ConcurrentAppendOnlyChunkedList<PagedMessage> liveMessages;
 
    private final long pageId;
 
@@ -39,7 +41,19 @@ public final class LivePageCacheImpl implements 
LivePageCache {
 
    public LivePageCacheImpl(final long pageId) {
       this.pageId = pageId;
-      this.messages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE);
+      this.liveMessages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE);
+      this.initialMessages = null;
+   }
+
+   public LivePageCacheImpl(final long pageId, PagedMessage[] initialMessages) 
{
+      this.pageId = pageId;
+      this.liveMessages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE);
+      this.initialMessages = initialMessages;
+   }
+
+   private int initialMessagesSize() {
+      final PagedMessage[] initialMessages = this.initialMessages;
+      return initialMessages == null ? 0 : initialMessages.length;
    }
 
    @Override
@@ -49,7 +63,7 @@ public final class LivePageCacheImpl implements LivePageCache 
{
 
    @Override
    public int getNumberOfMessages() {
-      return messages.size();
+      return initialMessagesSize() + liveMessages.size();
    }
 
    @Override
@@ -62,7 +76,16 @@ public final class LivePageCacheImpl implements 
LivePageCache {
 
    @Override
    public PagedMessage getMessage(PagePosition pagePosition) {
-      return messages.get(pagePosition.getMessageNr());
+      final int messageNr = pagePosition.getMessageNr();
+      if (messageNr < 0) {
+         return null;
+      }
+      final int initialOffset = initialMessagesSize();
+      if (messageNr < initialOffset) {
+         return initialMessages[messageNr];
+      }
+      final int index = messageNr - initialOffset;
+      return liveMessages.get(index);
    }
 
    @Override
@@ -73,7 +96,7 @@ public final class LivePageCacheImpl implements LivePageCache 
{
    @Override
    public void addLiveMessage(PagedMessage message) {
       message.getMessage().usageUp();
-      messages.add(message);
+      liveMessages.add(message);
    }
 
    @Override
@@ -84,7 +107,12 @@ public final class LivePageCacheImpl implements 
LivePageCache {
 
    @Override
    public PagedMessage[] getMessages() {
-      return messages.toArray(PagedMessage[]::new);
+      final PagedMessage[] pagedMessages = liveMessages.toArray(size -> new 
PagedMessage[initialMessagesSize() + size], initialMessagesSize());
+      final PagedMessage[] initialMessages = this.initialMessages;
+      if (initialMessages != null) {
+         System.arraycopy(initialMessages, 0, pagedMessages, 0, 
initialMessages.length);
+      }
+      return pagedMessages;
    }
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 3c248af..9fd5a96 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -478,16 +478,11 @@ public class PagingStoreImpl implements PagingStore {
       Page page = createPage(pageId);
       page.open();
 
-      List<PagedMessage> messages = page.read(storageManager);
+      final List<PagedMessage> messages = page.read(storageManager);
 
-      LivePageCache pageCache = new LivePageCacheImpl(pageId);
+      final PagedMessage[] initialMessages = messages.toArray(new 
PagedMessage[messages.size()]);
 
-      for (PagedMessage msg : messages) {
-         pageCache.addLiveMessage(msg);
-         // As we add back to the live page,
-         // we have to discount one when we read the page
-         msg.getMessage().usageDown();
-      }
+      final LivePageCache pageCache = new LivePageCacheImpl(pageId, 
initialMessages);
 
       page.setLiveCache(pageCache);
 

Reply via email to