Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246599701
  
    --- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
    @@ -48,54 +82,228 @@ public long getPageId() {
        }
     
        @Override
    -   public synchronized int getNumberOfMessages() {
    -      return messages.size();
    +   public int getNumberOfMessages() {
    +      while (true) {
    +         final long size = producerIndex;
    +         if (size == RESIZING) {
    +            Thread.yield();
    +            continue;
    +         }
    +         return (int) Math.min(size, Integer.MAX_VALUE);
    +      }
        }
     
        @Override
    -   public synchronized void setMessages(PagedMessage[] messages) {
    +   public void setMessages(PagedMessage[] messages) {
           // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
           for (PagedMessage msg : messages) {
              addLiveMessage(msg);
           }
        }
     
        @Override
    -   public synchronized PagedMessage getMessage(int messageNumber) {
    -      if (messageNumber < messages.size()) {
    -         return messages.get(messageNumber);
    -      } else {
    +   public PagedMessage getMessage(int messageNumber) {
    +      if (messageNumber < 0) {
              return null;
           }
    +      //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
    +      long size = lastSeenProducerIndex;
    +      if (messageNumber >= size) {
    +         while ((size = producerIndex) == RESIZING) {
    +            Thread.yield();
    +         }
    +         //it is a message over the current size?
    +         if (messageNumber >= size) {
    +            return null;
    +         }
    +         //publish it for others consumers
    +         LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
    +      }
    +      final AtomicChunk<PagedMessage> buffer;
    +      final int offset;
    +      if (messageNumber >= chunkSize) {
    +         offset = messageNumber & chunkMask;
    +         //slow path is moved in a separate method
    +         buffer = jump(messageNumber, size);
    +      } else {
    +         offset = messageNumber;
    +         buffer = consumerBuffer;
    +      }
    +      //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
    +      PagedMessage msg;
    +      while ((msg = buffer.get(offset)) == null) {
    +         Thread.yield();
    +      }
    +      return msg;
    +   }
    +
    +   /**
    +    * Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
    +    * ie backward search of a node if needed.
    +    */
    +   private AtomicChunk<PagedMessage> jump(final int messageNumber, final 
long size) {
    +      //fast division by a power of 2
    +      final int jumps = messageNumber >> chunkSizeLog2;
    --- End diff --
    
    eheh you're right :P, but the code of this collection was coming from a 
version where `chunkSize` was not a static final constant: in the new version 
is more clear why I have done it


---

Reply via email to