This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 41efeb2  ARTEMIS-2845 ConcurrentAppendOnlyChunkedList cannot be 
queried while resizing
     new 1ebfc8f  This closes #3218
41efeb2 is described below

commit 41efeb26697fa65d9175ca0e71b9d67118017e7d
Author: Francesco Nigro <[email protected]>
AuthorDate: Tue Jul 14 19:05:47 2020 +0200

    ARTEMIS-2845 ConcurrentAppendOnlyChunkedList cannot be queried while 
resizing
---
 .../ConcurrentAppendOnlyChunkedList.java           | 59 ++++++++++------------
 1 file changed, 27 insertions(+), 32 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
index 49981fd..c288e16 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
@@ -50,13 +50,12 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
 
    private final int chunkSizeLog2;
 
-   private static final long RESIZING = -1;
-
    private AtomicChunk<T> firstBuffer = null;
 
    private AtomicChunk<T> lastBuffer = null;
 
    //it is both the current index of the next element to be claimed and the 
current size of the collection
+   //it's using a parity bit to mark the rotation state ie size === lastIndex 
>> 1
    private volatile long lastIndex = 0;
 
    //cached view of lastIndex used to avoid invalidating lastIndex while being 
updated by the appends
@@ -80,14 +79,7 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
    }
 
    private long getValidLastIndex() {
-      while (true) {
-         final long lastIndex = this.lastIndex;
-         if (lastIndex == RESIZING) {
-            Thread.yield();
-            continue;
-         }
-         return lastIndex;
-      }
+      return this.lastIndex >> 1;
    }
 
    /**
@@ -185,36 +177,39 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
       Objects.requireNonNull(e);
       while (true) {
          final long lastIndex = this.lastIndex;
-         if (lastIndex != RESIZING) {
-            if (lastIndex == Integer.MAX_VALUE) {
-               throw new IllegalStateException("can't add more then " + 
Integer.MAX_VALUE + " elements");
-            }
-            //load acquire the current lastBuffer
-            final AtomicChunk<T> lastBuffer = this.lastBuffer;
-            final int offset = (int) (lastIndex & chunkMask);
-            //only the first attempt to add an element to a chunk can attempt 
to resize
-            if (offset == 0) {
-               if (addChunkAndElement(lastBuffer, lastIndex, e)) {
-                  return;
-               }
-            } else if (LAST_INDEX_UPDATER.compareAndSet(this, lastIndex, 
lastIndex + 1)) {
-               //this.lastBuffer is the correct buffer to append a element: it 
is guarded by the lastIndex logic
-               //NOTE: lastIndex is being updated before setting a new value
-               lastBuffer.lazySet(offset, e);
+         // lower bit is indicative of appending
+         if ((lastIndex & 1) == 1) {
+            continue;
+         }
+         final long validLastIndex = lastIndex >> 1;
+         if (validLastIndex == Integer.MAX_VALUE) {
+            throw new IllegalStateException("can't add more then " + 
Integer.MAX_VALUE + " elements");
+         }
+         //load acquire the current lastBuffer
+         final AtomicChunk<T> lastBuffer = this.lastBuffer;
+         final int offset = (int) (validLastIndex & chunkMask);
+         //only the first attempt to add an element to a chunk can attempt to 
resize
+         if (offset == 0) {
+            if (addChunkAndElement(lastBuffer, lastIndex, validLastIndex, e)) {
                return;
             }
+         } else if (LAST_INDEX_UPDATER.compareAndSet(this, lastIndex, 
lastIndex + 2)) {
+            //this.lastBuffer is the correct buffer to append a element: it is 
guarded by the lastIndex logic
+            //NOTE: lastIndex is being updated before setting a new value
+            lastBuffer.lazySet(offset, e);
+            return;
          }
-         Thread.yield();
       }
    }
 
-   private boolean addChunkAndElement(AtomicChunk<T> lastBuffer, long 
lastIndex, T element) {
-      if (!LAST_INDEX_UPDATER.compareAndSet(this, lastIndex, RESIZING)) {
+   private boolean addChunkAndElement(AtomicChunk<T> lastBuffer, long 
lastIndex, long validLastIndex, T element) {
+      // adding 1 will set the lower bit
+      if (!LAST_INDEX_UPDATER.compareAndSet(this, lastIndex, lastIndex + 1)) {
          return false;
       }
       final AtomicChunk<T> newChunk;
       try {
-         final int index = (int) (lastIndex >> chunkSizeLog2);
+         final int index = (int) (validLastIndex >> chunkSizeLog2);
          newChunk = new AtomicChunk<>(index, lastBuffer, chunkSize);
       } catch (OutOfMemoryError oom) {
          //unblock lastIndex without updating it
@@ -234,7 +229,8 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
       //making it the current produced one
       this.lastBuffer = newChunk;
       //store release any previous write and unblock anyone waiting resizing 
to finish
-      LAST_INDEX_UPDATER.lazySet(this, lastIndex + 1);
+      //and would clean the lower bit
+      LAST_INDEX_UPDATER.lazySet(this, lastIndex + 2);
       return true;
    }
 
@@ -268,7 +264,6 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
    private static <T> T pollElement(AtomicChunk<T> buffer, int i) {
       T e;
       while ((e = buffer.get(i)) == null) {
-         Thread.yield();
       }
       return e;
    }

Reply via email to