diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
new file mode 100644
index 0000000000..f55d9e1a0c
--- /dev/null
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.IntFunction;
+
+/**
+ * This collection is a concurrent append-only list that grows in chunks.<br>
+ * It's safe to be used by many threads concurrently and has a max capacity of 
{@link Integer#MAX_VALUE}.
+ */
+public final class ConcurrentAppendOnlyChunkedList<T> {
+
+   private static final class AtomicChunk<T> extends AtomicReferenceArray<T> {
+
+      AtomicChunk<T> next = null;
+      final AtomicChunk<T> prev;
+      final int index;
+
+      AtomicChunk(int index, AtomicChunk<T> prev, int length) {
+         super(length);
+         this.index = index;
+         this.prev = prev;
+      }
+   }
+
+   private static final 
AtomicLongFieldUpdater<ConcurrentAppendOnlyChunkedList> LAST_INDEX_UPDATER = 
AtomicLongFieldUpdater.newUpdater(ConcurrentAppendOnlyChunkedList.class, 
"lastIndex");
+
+   private static final 
AtomicLongFieldUpdater<ConcurrentAppendOnlyChunkedList> 
CACHED_LAST_INDEX_UPDATER = 
AtomicLongFieldUpdater.newUpdater(ConcurrentAppendOnlyChunkedList.class, 
"cachedLastIndex");
+
+   private final int chunkSize;
+
+   private final int chunkMask;
+
+   private final int chunkSizeLog2;
+
+   private static final long RESIZING = -1;
+
+   private AtomicChunk<T> firstBuffer = null;
+
+   private AtomicChunk<T> lastBuffer = null;
+
+   //it is both the current index of the next element to be claimed and the 
current size of the collection
+   private volatile long lastIndex = 0;
+
+   //cached view of lastIndex used to avoid invalidating lastIndex while being 
updated by the appends
+
+   private volatile long cachedLastIndex = 0;
+
+   /**
+    * @throws IllegalArgumentException if {@code chunkSize} is <0 or not a 
power of 2
+    */
+   public ConcurrentAppendOnlyChunkedList(final int chunkSize) {
+      if (chunkSize <= 0) {
+         throw new IllegalArgumentException("chunkSize must be >0");
+      }
+      //IMPORTANT: to enable some nice optimizations on / and %, chunk size 
MUST BE a power of 2
+      if (Integer.bitCount(chunkSize) != 1) {
+         throw new IllegalArgumentException("chunkSize must be a power of 2");
+      }
+      this.chunkSize = chunkSize;
+      this.chunkMask = chunkSize - 1;
+      this.chunkSizeLog2 = Integer.numberOfTrailingZeros(chunkSize);
+   }
+
+   private long getValidLastIndex() {
+      while (true) {
+         final long lastIndex = this.lastIndex;
+         if (lastIndex == RESIZING) {
+            Thread.yield();
+            continue;
+         }
+         return lastIndex;
+      }
+   }
+
+   /**
+    * It returns the number of elements currently added.
+    */
+   public int size() {
+      return (int) getValidLastIndex();
+   }
+
+   /**
+    * It appends {@code elements} to the collection.
+    */
+   public void addAll(T[] elements) {
+      for (T e : elements) {
+         add(e);
+      }
+   }
+
+   /**
+    * Returns the element at the specified position in this collection or 
{@code null} if not found.
+    */
+   public T get(int index) {
+      if (index < 0) {
+         return null;
+      }
+      //it allow to perform less cache invalidations vs lastIndex if there are 
bursts of appends
+      long lastIndex = cachedLastIndex;
+      if (index >= lastIndex) {
+         lastIndex = getValidLastIndex();
+         //it is a element over the current size?
+         if (index >= lastIndex) {
+            return null;
+         }
+         //publish it for others readers
+         CACHED_LAST_INDEX_UPDATER.lazySet(this, lastIndex);
+      }
+      final AtomicChunk<T> buffer;
+      final int offset;
+      if (index >= chunkSize) {
+         offset = index & chunkMask;
+         //slow path is moved in a separate method
+         buffer = getChunkOf(index, lastIndex);
+      } else {
+         offset = index;
+         buffer = firstBuffer;
+      }
+      return pollElement(buffer, offset);
+   }
+
+   /**
+    * Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+    * ie backward search of a node if needed.
+    */
+   private AtomicChunk<T> getChunkOf(final int index, final long lastIndex) {
+      final int chunkSizeLog2 = this.chunkSizeLog2;
+      //fast division by a power of 2
+      final int chunkIndex = index >> chunkSizeLog2;
+      //size is never allowed to be > Integer.MAX_VALUE
+      final int lastChunkIndex = (int) lastIndex >> chunkSizeLog2;
+      int chunkIndexes = chunkIndex;
+      AtomicChunk<T> buffer = null;
+      boolean forward = true;
+      int distanceFromLastChunkIndex = lastChunkIndex - chunkIndex;
+      //it's worth to go backward from lastChunkIndex?
+      //trying first to check against the value we already have: if it won't 
worth, won't make sense to load the lastBuffer
+      if (distanceFromLastChunkIndex < chunkIndex) {
+         final AtomicChunk<T> lastBuffer = this.lastBuffer;
+         //lastBuffer is a potential moving, always increasing, target ie 
better to re-check the distance
+         distanceFromLastChunkIndex = lastBuffer.index - chunkIndex;
+         if (distanceFromLastChunkIndex < chunkIndex) {
+            //we're saving some jumps ie is fine to go backward from here
+            buffer = lastBuffer;
+            chunkIndexes = distanceFromLastChunkIndex;
+            forward = false;
+         }
+      }
+      //start from the first buffer only is needed
+      if (buffer == null) {
+         buffer = firstBuffer;
+      }
+      for (int i = 0; i < chunkIndexes; i++) {
+         //next chunk is always set if below a read lastIndex value
+         //previous chunk is final and can be safely read
+         buffer = forward ? buffer.next : buffer.prev;
+      }
+      return buffer;
+   }
+
+   /**
+    * Appends the specified element to the end of this collection.
+    *
+    * @throws NullPointerException if {@code e} is {@code null}
+    **/
+   public void add(T e) {
+      Objects.requireNonNull(e);
+      while (true) {
+         final long lastIndex = this.lastIndex;
+         if (lastIndex != RESIZING) {
+            if (lastIndex == Integer.MAX_VALUE) {
+               throw new IllegalStateException("can't add more then " + 
Integer.MAX_VALUE + " elements");
+            }
+            //load acquire the current lastBuffer
+            final AtomicChunk<T> lastBuffer = this.lastBuffer;
+            final int offset = (int) (lastIndex & chunkMask);
+            //only the first attempt to add an element to a chunk can attempt 
to resize
+            if (offset == 0) {
+               if (addChunkAndElement(lastBuffer, lastIndex, e)) {
+                  return;
+               }
+            } else if (LAST_INDEX_UPDATER.compareAndSet(this, lastIndex, 
lastIndex + 1)) {
+               //this.lastBuffer is the correct buffer to append a element: it 
is guarded by the lastIndex logic
+               //NOTE: lastIndex is being updated before setting a new value
+               lastBuffer.lazySet(offset, e);
+               return;
+            }
+         }
+         Thread.yield();
+      }
+   }
+
+   private boolean addChunkAndElement(AtomicChunk<T> lastBuffer, long 
lastIndex, T element) {
+      if (!LAST_INDEX_UPDATER.compareAndSet(this, lastIndex, RESIZING)) {
+         return false;
+      }
+      final AtomicChunk<T> newChunk;
+      try {
+         final int index = (int) (lastIndex >> chunkSizeLog2);
+         newChunk = new AtomicChunk<>(index, lastBuffer, chunkSize);
+      } catch (OutOfMemoryError oom) {
+         //unblock lastIndex without updating it
+         LAST_INDEX_UPDATER.lazySet(this, lastIndex);
+         throw oom;
+      }
+      //adding the element to it
+      newChunk.lazySet(0, element);
+      //linking it to the old one, if any
+      if (lastBuffer != null) {
+         //a plain store is enough, given that lastIndex prevents any 
reader/writer to access it
+         lastBuffer.next = newChunk;
+      } else {
+         //it's first one
+         this.firstBuffer = newChunk;
+      }
+      //making it the current produced one
+      this.lastBuffer = newChunk;
+      //store release any previous write and unblock anyone waiting resizing 
to finish
+      LAST_INDEX_UPDATER.lazySet(this, lastIndex + 1);
+      return true;
+   }
+
+   /**
+    * Returns an array containing all of the elements in this collection in 
proper
+    * sequence (from first to last element).<br>
+    * {@code arrayAllocator} will be used to instantiate the array of the 
correct size with the right runtime type.
+    */
+   public T[] toArray(IntFunction<T[]> arrayAllocator) {
+      final long lastIndex = getValidLastIndex();
+      assert lastIndex <= Integer.MAX_VALUE;
+      final int size = (int) lastIndex;
+      final T[] elements = arrayAllocator.apply(size);
+      //fast division by a power of 2
+      final int chunkSize = this.chunkSize;
+      final int chunks = size > chunkSize ? size >> chunkSizeLog2 : 0;
+      AtomicChunk<T> buffer = firstBuffer;
+      int elementIndex = 0;
+      for (int i = 0; i < chunks; i++) {
+         drain(buffer, elements, elementIndex, chunkSize);
+         elementIndex += chunkSize;
+         //the next chunk is always set if we stay below a past size/lastIndex 
value
+         buffer = buffer.next;
+      }
+      final int remaining = chunks > 0 ? (size & chunkMask) : size;
+      drain(buffer, elements, elementIndex, remaining);
+      return elements;
+   }
+
+   //NOTE: lastIndex is being updated BEFORE setting a new value ie on reader 
side need to spin until a not null value is set
+   private static <T> T pollElement(AtomicChunk<T> buffer, int i) {
+      T e;
+      while ((e = buffer.get(i)) == null) {
+         Thread.yield();
+      }
+      return e;
+   }
+
+   private static <T> void drain(AtomicChunk<T> buffer, T[] elements, int 
elementNumber, int length) {
+      for (int j = 0; j < length; j++) {
+         final T e = pollElement(buffer, j);
+         assert e != null;
+         elements[elementNumber] = e;
+         elementNumber++;
+      }
+   }
+
+}
+
diff --git 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java
 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java
new file mode 100644
index 0000000000..a4f405971a
--- /dev/null
+++ 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedListTest.java
@@ -0,0 +1,155 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConcurrentAppendOnlyChunkedListTest {
+
+   private static final int CHUNK_SIZE = 32;
+   private static final int ELEMENTS = (CHUNK_SIZE * 3) + 1;
+
+   private final ConcurrentAppendOnlyChunkedList<Integer> chunkedList;
+
+   public ConcurrentAppendOnlyChunkedListTest() {
+      chunkedList = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE);
+   }
+
+   @Test(expected = IllegalArgumentException.class)
+   public void shouldFailToCreateNotPowerOf2ChunkSizeCollection() {
+      new ConcurrentAppendOnlyChunkedList<>(3);
+   }
+
+   @Test(expected = IllegalArgumentException.class)
+   public void shouldFailToCreateNegativeChunkSizeCollection() {
+      new ConcurrentAppendOnlyChunkedList<>(-1);
+   }
+
+   @Test
+   public void shouldNumberOfElementsBeTheSameOfTheAddedElements() {
+      final int messages = ELEMENTS;
+      for (int i = 0; i < messages; i++) {
+         Assert.assertEquals(i, chunkedList.size());
+         chunkedList.add((i));
+      }
+      Assert.assertEquals(messages, chunkedList.size());
+   }
+
+   @Test
+   public void shouldNumberOfElementsBeTheSameOfAddAllElements() {
+      final int messages = ELEMENTS;
+      final Integer[] elements = new Integer[messages];
+      for (int i = 0; i < messages; i++) {
+         final Integer element = i;
+         elements[i] = element;
+      }
+      chunkedList.addAll(elements);
+      Assert.assertEquals(messages, chunkedList.size());
+   }
+
+   @Test
+   public void shouldGetReturnNullIfEmpty() {
+      Assert.assertNull(chunkedList.get(0));
+   }
+
+   @Test
+   public void shouldNegativeIndexedGetReturnNull() {
+      Assert.assertNull(chunkedList.get(-1));
+      chunkedList.add(0);
+      Assert.assertNull(chunkedList.get(-1));
+   }
+
+   @Test
+   public void shouldGetReturnNullIfExceedSize() {
+      final int messages = ELEMENTS;
+      for (int i = 0; i < messages; i++) {
+         final Integer element = i;
+         chunkedList.add(element);
+         Assert.assertNull(chunkedList.get(i + 1));
+      }
+   }
+
+   @Test
+   public void shouldGetReturnElementsAccordingToAddOrder() {
+      final int messages = ELEMENTS;
+      final Integer[] elements = new Integer[messages];
+      for (int i = 0; i < messages; i++) {
+         final Integer element = i;
+         elements[i] = element;
+         chunkedList.add(element);
+      }
+      final Integer[] cachedElements = new Integer[messages];
+      for (int i = 0; i < messages; i++) {
+         cachedElements[i] = chunkedList.get(i);
+      }
+      Assert.assertArrayEquals(cachedElements, elements);
+   }
+
+   @Test
+   public void shouldGetReturnElementsAccordingToAddAllOrder() {
+      final int messages = ELEMENTS;
+      final Integer[] elements = new Integer[messages];
+      for (int i = 0; i < messages; i++) {
+         final Integer element = i;
+         elements[i] = element;
+      }
+      chunkedList.addAll(elements);
+      final Integer[] cachedElements = new Integer[messages];
+      for (int i = 0; i < messages; i++) {
+         cachedElements[i] = chunkedList.get(i);
+      }
+      Assert.assertArrayEquals(cachedElements, elements);
+   }
+
+   @Test
+   public void shouldToArrayReturnElementsAccordingToAddOrder() {
+      final int messages = ELEMENTS;
+      final Integer[] elements = new Integer[messages];
+      for (int i = 0; i < messages; i++) {
+         final Integer element = i;
+         elements[i] = element;
+         chunkedList.add(element);
+      }
+      final Integer[] cachedElements = chunkedList.toArray(Integer[]::new);
+      Assert.assertArrayEquals(elements, cachedElements);
+   }
+
+   @Test
+   public void shouldToArrayReturnElementsAccordingToAddAllOrder() {
+      final int messages = ELEMENTS;
+      final Integer[] elements = new Integer[messages];
+      for (int i = 0; i < messages; i++) {
+         final Integer element = i;
+         elements[i] = element;
+      }
+      chunkedList.addAll(elements);
+      final Integer[] cachedElements = chunkedList.toArray(Integer[]::new);
+      Assert.assertArrayEquals(elements, cachedElements);
+   }
+
+   @Test
+   public void shouldToArrayReturnEmptyArrayIfEmpty() {
+      final Integer[] array = chunkedList.toArray(Integer[]::new);
+      Assert.assertArrayEquals(new Integer[0], array);
+   }
+
+}
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
index 57d2e27608..6b55439b05 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
@@ -16,30 +16,31 @@
  */
 package org.apache.activemq.artemis.core.paging.cursor.impl;
 
-import java.util.LinkedList;
-import java.util.List;
-
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
 import org.apache.activemq.artemis.core.paging.impl.Page;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import 
org.apache.activemq.artemis.utils.collections.ConcurrentAppendOnlyChunkedList;
 import org.jboss.logging.Logger;
 
 /**
  * This is the same as PageCache, however this is for the page that's being 
currently written.
  */
-public class LivePageCacheImpl implements LivePageCache {
+public final class LivePageCacheImpl implements LivePageCache {
 
    private static final Logger logger = 
Logger.getLogger(LivePageCacheImpl.class);
 
-   private final List<PagedMessage> messages = new LinkedList<>();
+   private static final int CHUNK_SIZE = 32;
+
+   private final ConcurrentAppendOnlyChunkedList<PagedMessage> messages;
 
    private final Page page;
 
-   private boolean isLive = true;
+   private volatile boolean isLive = true;
 
    public LivePageCacheImpl(final Page page) {
       this.page = page;
+      this.messages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE);
    }
 
    @Override
@@ -48,54 +49,49 @@ public long getPageId() {
    }
 
    @Override
-   public synchronized int getNumberOfMessages() {
+   public int getNumberOfMessages() {
       return messages.size();
    }
 
    @Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
       // This method shouldn't be called on liveCache, but we will provide the 
implementation for it anyway
-      for (PagedMessage msg : messages) {
-         addLiveMessage(msg);
+      for (PagedMessage message : messages) {
+         addLiveMessage(message);
       }
    }
 
    @Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-      if (messageNumber < messages.size()) {
-         return messages.get(messageNumber);
-      } else {
-         return null;
-      }
+   public PagedMessage getMessage(int messageNumber) {
+      return messages.get(messageNumber);
    }
 
    @Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
       return isLive;
    }
 
    @Override
-   public synchronized void addLiveMessage(PagedMessage message) {
+   public void addLiveMessage(PagedMessage message) {
       if (message.getMessage().isLargeMessage()) {
          ((LargeServerMessage) 
message.getMessage()).incrementDelayDeletionCount();
       }
-      this.messages.add(message);
+      messages.add(message);
    }
 
    @Override
-   public synchronized void close() {
+   public void close() {
       logger.tracef("Closing %s", this);
       this.isLive = false;
    }
 
    @Override
-   public synchronized PagedMessage[] getMessages() {
-      return messages.toArray(new PagedMessage[messages.size()]);
+   public PagedMessage[] getMessages() {
+      return messages.toArray(PagedMessage[]::new);
    }
 
    @Override
    public String toString() {
-      return "LivePacheCacheImpl::page=" + page.getPageId() + " number of 
messages=" + messages.size() + " isLive = " +
-         isLive;
+      return "LivePacheCacheImpl::page=" + page.getPageId() + " number of 
messages=" + getNumberOfMessages() + " isLive = " + isLive;
    }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 7aab4e4248..ebf69d6fee 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -461,11 +461,22 @@ public void confirmPosition(final Transaction tx, final 
PagePosition position) t
 
    }
 
+   private void confirmPosition(final Transaction tx, final PagePosition 
position, final long persistentSize) throws Exception {
+      // if the cursor is persistent
+      if (persistent) {
+         store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, 
position);
+      }
+      installTXCallback(tx, position, persistentSize);
+   }
+
    @Override
    public void ackTx(final Transaction tx, final PagedReference reference) 
throws Exception {
-      confirmPosition(tx, reference.getPosition());
+      //pre-calculate persistentSize
+      final long persistentSize = getPersistentSize(reference);
+
+      confirmPosition(tx, reference.getPosition(), persistentSize);
 
-      counter.increment(tx, -1, -getPersistentSize(reference));
+      counter.increment(tx, -1, -persistentSize);
 
       PageTransactionInfo txInfo = getPageTransaction(reference);
       if (txInfo != null) {
@@ -864,11 +875,16 @@ private PageCursorInfo processACK(final PagePosition pos) 
{
       return info;
    }
 
+   private void installTXCallback(final Transaction tx, final PagePosition 
position) {
+      installTXCallback(tx, position, -1);
+   }
+
    /**
     * @param tx
     * @param position
+    * @param persistentSize if negative it needs to be calculated on the fly
     */
-   private void installTXCallback(final Transaction tx, final PagePosition 
position) {
+   private void installTXCallback(final Transaction tx, final PagePosition 
position, final long persistentSize) {
       if (position.getRecordID() >= 0) {
          // It needs to persist, otherwise the cursor will return to the fist 
page position
          tx.setContainsPersistent();
@@ -876,9 +892,15 @@ private void installTXCallback(final Transaction tx, final 
PagePosition position
 
       PageCursorInfo info = getPageInfo(position);
       PageCache cache = info.getCache();
-      long size = 0;
       if (cache != null) {
-         size = getPersistentSize(cache.getMessage(position.getMessageNr()));
+         final long size;
+         if (persistentSize < 0) {
+            //cache.getMessage is potentially expensive depending
+            //on the current cache size and which message is queried
+            size = 
getPersistentSize(cache.getMessage(position.getMessageNr()));
+         } else {
+            size = persistentSize;
+         }
          position.setPersistentSize(size);
       }
 


With regards,
Apache Git Services

Reply via email to