This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 19b04531c60bdc5f8ad3e1950f131a75c19d3fbf Author: franz1981 <[email protected]> AuthorDate: Mon Jan 4 15:10:49 2021 +0100 ARTEMIS-3049 Reduce live page lookup cost --- .../ConcurrentAppendOnlyChunkedList.java | 46 +++++++++---------- .../ConcurrentAppendOnlyChunkedListTest.java | 53 ++++++++++++++++++++-- .../core/paging/cursor/impl/LivePageCacheImpl.java | 40 +++++++++++++--- .../artemis/core/paging/impl/PagingStoreImpl.java | 11 ++--- 4 files changed, 108 insertions(+), 42 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java index c288e16..4a91fe0 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java @@ -42,8 +42,6 @@ public final class ConcurrentAppendOnlyChunkedList<T> { private static final AtomicLongFieldUpdater<ConcurrentAppendOnlyChunkedList> LAST_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(ConcurrentAppendOnlyChunkedList.class, "lastIndex"); - private static final AtomicLongFieldUpdater<ConcurrentAppendOnlyChunkedList> CACHED_LAST_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(ConcurrentAppendOnlyChunkedList.class, "cachedLastIndex"); - private final int chunkSize; private final int chunkMask; @@ -58,10 +56,6 @@ public final class ConcurrentAppendOnlyChunkedList<T> { //it's using a parity bit to mark the rotation state ie size === lastIndex >> 1 private volatile long lastIndex = 0; - //cached view of lastIndex used to avoid invalidating lastIndex while being updated by the appends - - private volatile long cachedLastIndex = 0; - /** * @throws IllegalArgumentException if {@code chunkSize} is <0 or not a power of 2 */ @@ -105,16 +99,10 @@ public final class ConcurrentAppendOnlyChunkedList<T> { if (index < 0) { return null; } - //it allow to perform less cache invalidations vs lastIndex if there are bursts of appends - long lastIndex = cachedLastIndex; + final long lastIndex = getValidLastIndex(); + //it is a element over the current size? if (index >= lastIndex) { - lastIndex = getValidLastIndex(); - //it is a element over the current size? - if (index >= lastIndex) { - return null; - } - //publish it for others readers - CACHED_LAST_INDEX_UPDATER.lazySet(this, lastIndex); + return null; } final AtomicChunk<T> buffer; final int offset; @@ -139,20 +127,20 @@ public final class ConcurrentAppendOnlyChunkedList<T> { final int chunkIndex = index >> chunkSizeLog2; //size is never allowed to be > Integer.MAX_VALUE final int lastChunkIndex = (int) lastIndex >> chunkSizeLog2; - int chunkIndexes = chunkIndex; + int distance = chunkIndex; AtomicChunk<T> buffer = null; boolean forward = true; - int distanceFromLastChunkIndex = lastChunkIndex - chunkIndex; + int distanceFromLast = lastChunkIndex - chunkIndex; //it's worth to go backward from lastChunkIndex? //trying first to check against the value we already have: if it won't worth, won't make sense to load the lastBuffer - if (distanceFromLastChunkIndex < chunkIndex) { + if (distanceFromLast < distance) { final AtomicChunk<T> lastBuffer = this.lastBuffer; //lastBuffer is a potential moving, always increasing, target ie better to re-check the distance - distanceFromLastChunkIndex = lastBuffer.index - chunkIndex; - if (distanceFromLastChunkIndex < chunkIndex) { + distanceFromLast = lastBuffer.index - chunkIndex; + if (distanceFromLast < distance) { //we're saving some jumps ie is fine to go backward from here buffer = lastBuffer; - chunkIndexes = distanceFromLastChunkIndex; + distance = distanceFromLast; forward = false; } } @@ -160,7 +148,7 @@ public final class ConcurrentAppendOnlyChunkedList<T> { if (buffer == null) { buffer = firstBuffer; } - for (int i = 0; i < chunkIndexes; i++) { + for (int i = 0; i < distance; i++) { //next chunk is always set if below a read lastIndex value //previous chunk is final and can be safely read buffer = forward ? buffer.next : buffer.prev; @@ -234,21 +222,31 @@ public final class ConcurrentAppendOnlyChunkedList<T> { return true; } + public T[] toArray(IntFunction<T[]> arrayAllocator) { + return toArray(arrayAllocator, 0); + } + /** * Returns an array containing all of the elements in this collection in proper * sequence (from first to last element).<br> * {@code arrayAllocator} will be used to instantiate the array of the correct size with the right runtime type. */ - public T[] toArray(IntFunction<T[]> arrayAllocator) { + public T[] toArray(IntFunction<T[]> arrayAllocator, int startIndex) { + if (startIndex < 0) { + throw new ArrayIndexOutOfBoundsException("startIndex must be >= 0"); + } final long lastIndex = getValidLastIndex(); assert lastIndex <= Integer.MAX_VALUE; final int size = (int) lastIndex; final T[] elements = arrayAllocator.apply(size); + if (startIndex + size > elements.length) { + throw new ArrayIndexOutOfBoundsException(); + } //fast division by a power of 2 final int chunkSize = this.chunkSize; final int chunks = size > chunkSize ? size >> chunkSizeLog2 : 0; AtomicChunk<T> buffer = firstBuffer; - int elementIndex = 0; + int elementIndex = startIndex; for (int i = 0; i < chunks; i++) { drain(buffer, elements, elementIndex, chunkSize); elementIndex += chunkSize; diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java index 922c780..2803d60 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java @@ -20,13 +20,15 @@ */ package org.apache.activemq.artemis.utils.collections; +import java.util.Arrays; + import org.junit.Assert; import org.junit.Test; public class ConcurrentAppendOnlyChunkedListTest { - private static final int CHUNK_SIZE = 32; - private static final int ELEMENTS = (CHUNK_SIZE * 3) + 1; + private static final int CHUNK_SIZE = 16; + private static final int ELEMENTS = (CHUNK_SIZE * 4) + 1; private final ConcurrentAppendOnlyChunkedList<Integer> chunkedList; @@ -101,7 +103,12 @@ public class ConcurrentAppendOnlyChunkedListTest { for (int i = 0; i < messages; i++) { cachedElements[i] = chunkedList.get(i); } - Assert.assertArrayEquals(cachedElements, elements); + Assert.assertArrayEquals(elements, cachedElements); + Arrays.fill(cachedElements, null); + for (int i = messages - 1; i >= 0; i--) { + cachedElements[i] = chunkedList.get(i); + } + Assert.assertArrayEquals(elements, cachedElements); } @Test @@ -117,7 +124,7 @@ public class ConcurrentAppendOnlyChunkedListTest { for (int i = 0; i < messages; i++) { cachedElements[i] = chunkedList.get(i); } - Assert.assertArrayEquals(cachedElements, elements); + Assert.assertArrayEquals(elements, cachedElements); } @Test @@ -134,6 +141,44 @@ public class ConcurrentAppendOnlyChunkedListTest { } @Test + public void shouldToArrayWithIndexReturnElementsAccordingToAddOrder() { + final int messages = ELEMENTS; + final Integer[] elements = new Integer[messages]; + for (int i = 0; i < messages; i++) { + final Integer element = i; + elements[i] = element; + chunkedList.add(element); + } + final int offset = 10; + final Integer[] cachedElements = chunkedList.toArray(size -> new Integer[offset + size], offset); + Assert.assertArrayEquals(elements, Arrays.copyOfRange(cachedElements, offset, cachedElements.length)); + Assert.assertArrayEquals(new Integer[offset], Arrays.copyOfRange(cachedElements, 0, offset)); + } + + @Test(expected = ArrayIndexOutOfBoundsException.class) + public void shouldFailToArrayWithInsufficientArrayCapacity() { + final int messages = ELEMENTS; + final Integer[] elements = new Integer[messages]; + for (int i = 0; i < messages; i++) { + final Integer element = i; + elements[i] = element; + chunkedList.add(element); + } + final int offset = 10; + chunkedList.toArray(size -> new Integer[offset + size - 1], offset); + } + + @Test(expected = ArrayIndexOutOfBoundsException.class) + public void shouldFailToArrayWithNegativeStartIndex() { + chunkedList.toArray(Integer[]::new, -1); + } + + @Test(expected = NullPointerException.class) + public void shouldFailToArrayWithNullArray() { + chunkedList.toArray(size -> null); + } + + @Test public void shouldToArrayReturnElementsAccordingToAddAllOrder() { final int messages = ELEMENTS; final Integer[] elements = new Integer[messages]; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java index 6fb42c1..4f93e36 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java @@ -31,7 +31,9 @@ public final class LivePageCacheImpl implements LivePageCache { private static final int CHUNK_SIZE = 32; - private final ConcurrentAppendOnlyChunkedList<PagedMessage> messages; + private final PagedMessage[] initialMessages; + + private final ConcurrentAppendOnlyChunkedList<PagedMessage> liveMessages; private final long pageId; @@ -39,7 +41,19 @@ public final class LivePageCacheImpl implements LivePageCache { public LivePageCacheImpl(final long pageId) { this.pageId = pageId; - this.messages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE); + this.liveMessages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE); + this.initialMessages = null; + } + + public LivePageCacheImpl(final long pageId, PagedMessage[] initialMessages) { + this.pageId = pageId; + this.liveMessages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE); + this.initialMessages = initialMessages; + } + + private int initialMessagesSize() { + final PagedMessage[] initialMessages = this.initialMessages; + return initialMessages == null ? 0 : initialMessages.length; } @Override @@ -49,7 +63,7 @@ public final class LivePageCacheImpl implements LivePageCache { @Override public int getNumberOfMessages() { - return messages.size(); + return initialMessagesSize() + liveMessages.size(); } @Override @@ -62,7 +76,16 @@ public final class LivePageCacheImpl implements LivePageCache { @Override public PagedMessage getMessage(PagePosition pagePosition) { - return messages.get(pagePosition.getMessageNr()); + final int messageNr = pagePosition.getMessageNr(); + if (messageNr < 0) { + return null; + } + final int initialOffset = initialMessagesSize(); + if (messageNr < initialOffset) { + return initialMessages[messageNr]; + } + final int index = messageNr - initialOffset; + return liveMessages.get(index); } @Override @@ -73,7 +96,7 @@ public final class LivePageCacheImpl implements LivePageCache { @Override public void addLiveMessage(PagedMessage message) { message.getMessage().usageUp(); - messages.add(message); + liveMessages.add(message); } @Override @@ -84,7 +107,12 @@ public final class LivePageCacheImpl implements LivePageCache { @Override public PagedMessage[] getMessages() { - return messages.toArray(PagedMessage[]::new); + final PagedMessage[] pagedMessages = liveMessages.toArray(size -> new PagedMessage[initialMessagesSize() + size], initialMessagesSize()); + final PagedMessage[] initialMessages = this.initialMessages; + if (initialMessages != null) { + System.arraycopy(initialMessages, 0, pagedMessages, 0, initialMessages.length); + } + return pagedMessages; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 3c248af..9fd5a96 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -478,16 +478,11 @@ public class PagingStoreImpl implements PagingStore { Page page = createPage(pageId); page.open(); - List<PagedMessage> messages = page.read(storageManager); + final List<PagedMessage> messages = page.read(storageManager); - LivePageCache pageCache = new LivePageCacheImpl(pageId); + final PagedMessage[] initialMessages = messages.toArray(new PagedMessage[messages.size()]); - for (PagedMessage msg : messages) { - pageCache.addLiveMessage(msg); - // As we add back to the live page, - // we have to discount one when we read the page - msg.getMessage().usageDown(); - } + final LivePageCache pageCache = new LivePageCacheImpl(pageId, initialMessages); page.setLiveCache(pageCache);
