Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246599701 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { + Thread.yield(); + continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { + Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { + return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk<PagedMessage> buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** + * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries + * ie backward search of a node if needed. + */ + private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; --- End diff -- eheh you're right :P, but the code of this collection was coming from a version where `chunkSize` was not a static final constant: in the new version is more clear why I have done it
---