[
https://issues.apache.org/jira/browse/ARTEMIS-2224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738723#comment-16738723
]
ASF GitHub Bot commented on ARTEMIS-2224:
-----------------------------------------
Github user michaelandrepearce commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246567995
--- Diff:
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
---
@@ -48,54 +82,228 @@ public long getPageId() {
}
@Override
- public synchronized int getNumberOfMessages() {
- return messages.size();
+ public int getNumberOfMessages() {
+ while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+ Thread.yield();
+ continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
}
@Override
- public synchronized void setMessages(PagedMessage[] messages) {
+ public void setMessages(PagedMessage[] messages) {
// This method shouldn't be called on liveCache, but we will provide
the implementation for it anyway
for (PagedMessage msg : messages) {
addLiveMessage(msg);
}
}
@Override
- public synchronized PagedMessage getMessage(int messageNumber) {
- if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
- } else {
+ public PagedMessage getMessage(int messageNumber) {
+ if (messageNumber < 0) {
return null;
}
+ //it allow to perform less cache invalidations vs producerIndex if
there are bursts of appends
+ long size = lastSeenProducerIndex;
+ if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+ Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+ return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+ }
+ final AtomicChunk<PagedMessage> buffer;
+ final int offset;
+ if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+ } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+ }
+ //NOTE: producerIndex is being updated before setting a new value ie
on consumer side need to spin until a not null value is set
+ PagedMessage msg;
+ while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+ }
+ return msg;
+ }
+
+ /**
+ * Implements a lock-free version of the optimization used on {@link
java.util.LinkedList#get(int)} to speed up queries
+ * ie backward search of a node if needed.
+ */
+ private AtomicChunk<PagedMessage> jump(final int messageNumber, final
long size) {
+ //fast division by a power of 2
+ final int jumps = messageNumber >> chunkSizeLog2;
+ //size is never allowed to be > Integer.MAX_VALUE
+ final int lastChunkIndex = (int) size >> chunkSizeLog2;
+ int requiredJumps = jumps;
+ AtomicChunk<PagedMessage> jumpBuffer = null;
+ boolean jumpForward = true;
+ int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+ //it's worth to go backward from lastChunkIndex?
+ //trying first to check against the value we already have: if it
won't worth, won't make sense to load the producerBuffer
+ if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk<PagedMessage> producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+ //we're saving some jumps ie is fine to go backward from here
+ jumpBuffer = producer;
+ requiredJumps = distanceFromLastChunkIndex;
+ jumpForward = false;
+ }
+ }
+ //start from the consumer buffer only is needed
+ if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+ }
+ for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+ }
+ return jumpBuffer;
}
@Override
- public synchronized boolean isLive() {
+ public boolean isLive() {
return isLive;
}
@Override
- public synchronized void addLiveMessage(PagedMessage message) {
+ public void addLiveMessage(PagedMessage message) {
if (message.getMessage().isLargeMessage()) {
((LargeServerMessage)
message.getMessage()).incrementDelayDeletionCount();
}
- this.messages.add(message);
+ while (true) {
+ final long pIndex = producerIndex;
+ if (pIndex != RESIZING) {
+ if (pIndex == Integer.MAX_VALUE) {
+ throw new IllegalStateException("can't add more then " +
Integer.MAX_VALUE + " messages");
+ }
+ //load acquire the current producer buffer
+ final AtomicChunk<PagedMessage> producerBuffer =
this.producerBuffer;
+ final int pOffset = (int) (pIndex & chunkMask);
+ //only the first message to a chunk can attempt to resize
+ if (pOffset == 0) {
+ if (appendChunkAndMessage(producerBuffer, pIndex, message))
{
+ return;
+ }
+ } else if (PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex,
pIndex + 1)) {
+ //this.producerBuffer is the correct buffer to append a
message: it is guarded by the producerIndex logic
+ //NOTE: producerIndex is being updated before setting a new
value
+ producerBuffer.lazySet(pOffset, message);
+ return;
+ }
+ }
+ Thread.yield();
+ }
+ }
+
+ private boolean appendChunkAndMessage(AtomicChunk<PagedMessage>
producerBuffer, long pIndex, PagedMessage message) {
+ if (!PRODUCER_INDEX_UPDATER.compareAndSet(this, pIndex, RESIZING)) {
+ return false;
+ }
+ final AtomicChunk<PagedMessage> newChunk;
+ try {
+ final int index = (int) (pIndex >> chunkSizeLog2);
+ newChunk = new AtomicChunk<>(index, producerBuffer, chunkSize);
+ } catch (OutOfMemoryError oom) {
+ //unblock producerIndex without updating it
+ PRODUCER_INDEX_UPDATER.lazySet(this, pIndex);
+ throw oom;
+ }
+ //adding the message to it
+ newChunk.lazySet(0, message);
+ //linking it to the old one, if any
+ if (producerBuffer != null) {
+ //a plain store is enough, given that producerIndex prevents any
reader/writer to access it
+ producerBuffer.next = newChunk;
+ } else {
+ //it's first one
+ this.consumerBuffer = newChunk;
+ }
+ //making it the current produced one
+ this.producerBuffer = newChunk;
+ //store release any previous write and "unblock" anyone waiting
resizing to finish
+ PRODUCER_INDEX_UPDATER.lazySet(this, pIndex + 1);
+ return true;
}
@Override
- public synchronized void close() {
+ public void close() {
logger.tracef("Closing %s", this);
this.isLive = false;
}
+ private static PagedMessage[] EMPTY_MSG = null;
+
+ private static PagedMessage[] noMessages() {
+ //it is a benign race: no need strong initializations here
+ PagedMessage[] empty = EMPTY_MSG;
+ if (empty != null) {
+ return empty;
+ } else {
+ empty = new PagedMessage[0];
--- End diff --
Why not simply make noMessages return a static empty array, e.g. make
PagedMessage[0] - similar in nature to
https://android.googlesource.com/platform/libcore/+/jb-mr2-release/luni/src/main/java/libcore/util/EmptyArray.java,
and then saves instantiating every time.
Also avoids a race condition, this current code, seems to have with lazy
initialization, e.g. currently if two threads concurrently call noMessages its
possible i get two PagedMessage[0] objects created.
> Reduce contention on LivePageCacheImpl
> --------------------------------------
>
> Key: ARTEMIS-2224
> URL: https://issues.apache.org/jira/browse/ARTEMIS-2224
> Project: ActiveMQ Artemis
> Issue Type: Improvement
> Components: Broker
> Affects Versions: 2.7.0
> Reporter: Francesco Nigro
> Assignee: Francesco Nigro
> Priority: Major
>
> Has been measured that LIvePageCacheImpl operations are a source of
> contention on producer side while paging.
> This contention decrease the scalability of the broker in an evident way
> while using topics, because the page cache is been accessed concurrently by
> several producers to ack transactions while the messages are being appended.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)