[
https://issues.apache.org/jira/browse/ARTEMIS-2224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738849#comment-16738849
]
ASF GitHub Bot commented on ARTEMIS-2224:
-----------------------------------------
Github user franz1981 commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2494#discussion_r246599701
--- Diff:
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
---
@@ -48,54 +82,228 @@ public long getPageId() {
}
@Override
- public synchronized int getNumberOfMessages() {
- return messages.size();
+ public int getNumberOfMessages() {
+ while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+ Thread.yield();
+ continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
}
@Override
- public synchronized void setMessages(PagedMessage[] messages) {
+ public void setMessages(PagedMessage[] messages) {
// This method shouldn't be called on liveCache, but we will provide
the implementation for it anyway
for (PagedMessage msg : messages) {
addLiveMessage(msg);
}
}
@Override
- public synchronized PagedMessage getMessage(int messageNumber) {
- if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
- } else {
+ public PagedMessage getMessage(int messageNumber) {
+ if (messageNumber < 0) {
return null;
}
+ //it allow to perform less cache invalidations vs producerIndex if
there are bursts of appends
+ long size = lastSeenProducerIndex;
+ if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+ Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+ return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+ }
+ final AtomicChunk<PagedMessage> buffer;
+ final int offset;
+ if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+ } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+ }
+ //NOTE: producerIndex is being updated before setting a new value ie
on consumer side need to spin until a not null value is set
+ PagedMessage msg;
+ while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+ }
+ return msg;
+ }
+
+ /**
+ * Implements a lock-free version of the optimization used on {@link
java.util.LinkedList#get(int)} to speed up queries
+ * ie backward search of a node if needed.
+ */
+ private AtomicChunk<PagedMessage> jump(final int messageNumber, final
long size) {
+ //fast division by a power of 2
+ final int jumps = messageNumber >> chunkSizeLog2;
--- End diff --
eheh you're right :P, but the code of this collection was coming from a
version where `chunkSize` was not a static final constant: in the new version
is more clear why I have done it
> Reduce contention on LivePageCacheImpl
> --------------------------------------
>
> Key: ARTEMIS-2224
> URL: https://issues.apache.org/jira/browse/ARTEMIS-2224
> Project: ActiveMQ Artemis
> Issue Type: Improvement
> Components: Broker
> Affects Versions: 2.7.0
> Reporter: Francesco Nigro
> Assignee: Francesco Nigro
> Priority: Major
>
> Has been measured that LIvePageCacheImpl operations are a source of
> contention on producer side while paging.
> This contention decrease the scalability of the broker in an evident way
> while using topics, because the page cache is been accessed concurrently by
> several producers to ack transactions while the messages are being appended.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)