This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 41efeb2 ARTEMIS-2845 ConcurrentAppendOnlyChunkedList cannot be
queried while resizing
new 1ebfc8f This closes #3218
41efeb2 is described below
commit 41efeb26697fa65d9175ca0e71b9d67118017e7d
Author: Francesco Nigro <[email protected]>
AuthorDate: Tue Jul 14 19:05:47 2020 +0200
ARTEMIS-2845 ConcurrentAppendOnlyChunkedList cannot be queried while
resizing
---
.../ConcurrentAppendOnlyChunkedList.java | 59 ++++++++++------------
1 file changed, 27 insertions(+), 32 deletions(-)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
index 49981fd..c288e16 100644
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
@@ -50,13 +50,12 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
private final int chunkSizeLog2;
- private static final long RESIZING = -1;
-
private AtomicChunk<T> firstBuffer = null;
private AtomicChunk<T> lastBuffer = null;
//it is both the current index of the next element to be claimed and the
current size of the collection
+ //it's using a parity bit to mark the rotation state ie size === lastIndex
>> 1
private volatile long lastIndex = 0;
//cached view of lastIndex used to avoid invalidating lastIndex while being
updated by the appends
@@ -80,14 +79,7 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
}
private long getValidLastIndex() {
- while (true) {
- final long lastIndex = this.lastIndex;
- if (lastIndex == RESIZING) {
- Thread.yield();
- continue;
- }
- return lastIndex;
- }
+ return this.lastIndex >> 1;
}
/**
@@ -185,36 +177,39 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
Objects.requireNonNull(e);
while (true) {
final long lastIndex = this.lastIndex;
- if (lastIndex != RESIZING) {
- if (lastIndex == Integer.MAX_VALUE) {
- throw new IllegalStateException("can't add more then " +
Integer.MAX_VALUE + " elements");
- }
- //load acquire the current lastBuffer
- final AtomicChunk<T> lastBuffer = this.lastBuffer;
- final int offset = (int) (lastIndex & chunkMask);
- //only the first attempt to add an element to a chunk can attempt
to resize
- if (offset == 0) {
- if (addChunkAndElement(lastBuffer, lastIndex, e)) {
- return;
- }
- } else if (LAST_INDEX_UPDATER.compareAndSet(this, lastIndex,
lastIndex + 1)) {
- //this.lastBuffer is the correct buffer to append a element: it
is guarded by the lastIndex logic
- //NOTE: lastIndex is being updated before setting a new value
- lastBuffer.lazySet(offset, e);
+ // lower bit is indicative of appending
+ if ((lastIndex & 1) == 1) {
+ continue;
+ }
+ final long validLastIndex = lastIndex >> 1;
+ if (validLastIndex == Integer.MAX_VALUE) {
+ throw new IllegalStateException("can't add more then " +
Integer.MAX_VALUE + " elements");
+ }
+ //load acquire the current lastBuffer
+ final AtomicChunk<T> lastBuffer = this.lastBuffer;
+ final int offset = (int) (validLastIndex & chunkMask);
+ //only the first attempt to add an element to a chunk can attempt to
resize
+ if (offset == 0) {
+ if (addChunkAndElement(lastBuffer, lastIndex, validLastIndex, e)) {
return;
}
+ } else if (LAST_INDEX_UPDATER.compareAndSet(this, lastIndex,
lastIndex + 2)) {
+ //this.lastBuffer is the correct buffer to append a element: it is
guarded by the lastIndex logic
+ //NOTE: lastIndex is being updated before setting a new value
+ lastBuffer.lazySet(offset, e);
+ return;
}
- Thread.yield();
}
}
- private boolean addChunkAndElement(AtomicChunk<T> lastBuffer, long
lastIndex, T element) {
- if (!LAST_INDEX_UPDATER.compareAndSet(this, lastIndex, RESIZING)) {
+ private boolean addChunkAndElement(AtomicChunk<T> lastBuffer, long
lastIndex, long validLastIndex, T element) {
+ // adding 1 will set the lower bit
+ if (!LAST_INDEX_UPDATER.compareAndSet(this, lastIndex, lastIndex + 1)) {
return false;
}
final AtomicChunk<T> newChunk;
try {
- final int index = (int) (lastIndex >> chunkSizeLog2);
+ final int index = (int) (validLastIndex >> chunkSizeLog2);
newChunk = new AtomicChunk<>(index, lastBuffer, chunkSize);
} catch (OutOfMemoryError oom) {
//unblock lastIndex without updating it
@@ -234,7 +229,8 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
//making it the current produced one
this.lastBuffer = newChunk;
//store release any previous write and unblock anyone waiting resizing
to finish
- LAST_INDEX_UPDATER.lazySet(this, lastIndex + 1);
+ //and would clean the lower bit
+ LAST_INDEX_UPDATER.lazySet(this, lastIndex + 2);
return true;
}
@@ -268,7 +264,6 @@ public final class ConcurrentAppendOnlyChunkedList<T> {
private static <T> T pollElement(AtomicChunk<T> buffer, int i) {
T e;
while ((e = buffer.get(i)) == null) {
- Thread.yield();
}
return e;
}