Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246574775 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { + Thread.yield(); + continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { + Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { + return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk<PagedMessage> buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** + * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries + * ie backward search of a node if needed. + */ + private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; + //size is never allowed to be > Integer.MAX_VALUE + final int lastChunkIndex = (int) size >> chunkSizeLog2; + int requiredJumps = jumps; + AtomicChunk<PagedMessage> jumpBuffer = null; + boolean jumpForward = true; + int distanceFromLastChunkIndex = lastChunkIndex - jumps; + //it's worth to go backward from lastChunkIndex? + //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer + if (distanceFromLastChunkIndex < jumps) { + final AtomicChunk<PagedMessage> producer = producerBuffer; + //producer is a potential moving, always increasing, target ie better to re-check the distance + distanceFromLastChunkIndex = producer.index - jumps; + if (distanceFromLastChunkIndex < jumps) { + //we're saving some jumps ie is fine to go backward from here + jumpBuffer = producer; + requiredJumps = distanceFromLastChunkIndex; + jumpForward = false; + } + } + //start from the consumer buffer only is needed + if (jumpBuffer == null) { + jumpBuffer = consumerBuffer; + } + for (int i = 0; i < requiredJumps; i++) { + //next chunk is always set if below a read producerIndex value + //previous chunk is final and can be safely read + jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev; + } + return jumpBuffer; } @Override - public synchronized boolean isLive() { + public boolean isLive() { return isLive; } @Override - public synchronized void addLiveMessage(PagedMessage message) { + public void addLiveMessage(PagedMessage message) { if (message.getMessage().isLargeMessage()) { ((LargeServerMessage) message.getMessage()).incrementDelayDeletionCount(); } - this.messages.add(message); + while (true) { + final long pIndex = producerIndex; + if (pIndex != RESIZING) { + if (pIndex == Integer.MAX_VALUE) { + throw new IllegalStateException("can't add more then " + Integer.MAX_VALUE + " messages"); + } + //load acquire the current producer buffer + final AtomicChunk<PagedMessage> producerBuffer = this.producerBuffer; + final int pOffset = (int) (pIndex & chunkMask); + //only the first message to a chunk can attempt to resize + if (pOffset == 0) { + if (appendChunkAndMessage(producerBuffer, pIndex, message)) { + return; + } + } else if (PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, pIndex + 1)) { + //this.producerBuffer is the correct buffer to append a message: it is guarded by the producerIndex logic + //NOTE: producerIndex is being updated before setting a new value + producerBuffer.lazySet(pOffset, message); + return; + } + } + Thread.yield(); + } + } + + private boolean appendChunkAndMessage(AtomicChunk<PagedMessage> producerBuffer, long pIndex, PagedMessage message) { + if (!PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, RESIZING)) { + return false; + } + final AtomicChunk<PagedMessage> newChunk; + try { + final int index = (int) (pIndex >> chunkSizeLog2); + newChunk = new AtomicChunk<>(index, producerBuffer, chunkSize); + } catch (OutOfMemoryError oom) { + //unblock producerIndex without updating it + PRODUCER_INDEX_UPDATER.lazySet(this, pIndex); + throw oom; + } + //adding the message to it + newChunk.lazySet(0, message); + //linking it to the old one, if any + if (producerBuffer != null) { + //a plain store is enough, given that producerIndex prevents any reader/writer to access it + producerBuffer.next = newChunk; + } else { + //it's first one + this.consumerBuffer = newChunk; + } + //making it the current produced one + this.producerBuffer = newChunk; + //store release any previous write and "unblock" anyone waiting resizing to finish + PRODUCER_INDEX_UPDATER.lazySet(this, pIndex + 1); + return true; } @Override - public synchronized void close() { + public void close() { logger.tracef("Closing %s", this); this.isLive = false; } + private static PagedMessage[] EMPTY_MSG = null; + + private static PagedMessage[] noMessages() { + //it is a benign race: no need strong initializations here + PagedMessage[] empty = EMPTY_MSG; + if (empty != null) { + return empty; + } else { + empty = new PagedMessage[0]; + EMPTY_MSG = empty; + } + return empty; + } + @Override - public synchronized PagedMessage[] getMessages() { - return messages.toArray(new PagedMessage[messages.size()]); + public PagedMessage[] getMessages() { + long currentSize; + while ((currentSize = producerIndex) == RESIZING) { --- End diff -- good catch! :+1: While creating a more generic collection I will refactor the bits to avoid duplication when possible
---