gemmellr commented on code in PR #4101:
URL: https://github.com/apache/activemq-artemis/pull/4101#discussion_r895631294
##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounterUtil.java:
##########
@@ -61,24 +63,31 @@ public Runnable getTask() {
@Override
public int increment() {
- return uses.incrementAndGet();
+ return useUpdater.incrementAndGet(this);
}
@Override
public int decrement() {
- int value = uses.decrementAndGet();
+ int value = useUpdater.decrementAndGet(this);
if (value == 0) {
execute();
}
return value;
}
+ public void reset() {
+ execute();
Review Comment:
Should reset() execute? Resetting as much seems to suggest we are starting
from scratch, and so may not imply that whatever task would have been run in
normal conditions should now be run?
##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/EmptyList.java:
##########
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
Review Comment:
Should be comment not javadoc
##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/EmptyList.java:
##########
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.function.Consumer;
+
+public class EmptyList<E> implements LinkedList<E> {
+
+
+ private static final LinkedList EMPTY_LIST = new EmptyList();
+
+ public static final <T> LinkedList<T> getEmptyList() {
+ return (LinkedList<T>) EMPTY_LIST;
+ }
+
+ private EmptyList() {
+ }
+
+
+
+
+ @Override
+ public void addHead(E e) {
+ throw new IllegalStateException("method not supported");
+ }
+
+ @Override
+ public void addTail(E e) {
+ throw new IllegalStateException("method not supported");
Review Comment:
UOE would seem more appropriate given the message. Same to later instances
below
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java:
##########
@@ -55,4 +58,13 @@ public interface PagedMessage extends EncodingSupport {
* at the same amount of bytes it used. In some cases it may need to add
headers in AMQP
* or extra data that may affect the outcome of getEncodeSize() */
int getStoredSize();
+
+ long getPageNr();
+
+ PagedMessage setPageNr(long pageNr);
+
+ int getMessageNr();
+
+ PagedMessage setMessageNr(int messageNr);
Review Comment:
Given the length of some of the other methods, I dont think abbreviating
"Number" to "Nr" really aids much.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java:
##########
@@ -1172,15 +1044,9 @@ public final class PageCursorInfo implements
ConsumedPage {
private final long pageId;
- // Confirmed ACKs on this page
- private Set<PagePosition> acks = Collections.synchronizedSet(new
LinkedHashSet<PagePosition>());
-
- private WeakReference<PageCache> cache;
+ private IntObjectHashMap<PagePosition> acks = new IntObjectHashMap<>();
- private Set<PagePosition> removedReferences = new ConcurrentHashSet<>();
-
- // The page was live at the time of the creation
- private final boolean wasLive;
+ private IntObjectHashMap removedReferences = new IntObjectHashMap<>();
Review Comment:
Why is this one untyped and the other isnt?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java:
##########
@@ -237,6 +270,8 @@ public String toString() {
return "PagedMessageImpl [queueIDs=" + Arrays.toString(queueIDs) +
", transactionID=" +
transactionID +
+ ", page=" +
+ pageNr + "#" + pageNr +
Review Comment:
pageNr is being passed twice, should one be the position in the page?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java:
##########
@@ -32,43 +32,59 @@ public class ReplicationPageEventMessage extends PacketImpl
{
*/
private boolean isDelete;
- public ReplicationPageEventMessage() {
+ private final boolean useLong;
+
+ public ReplicationPageEventMessage(boolean useLong) {
super(PacketImpl.REPLICATION_PAGE_EVENT);
+ this.useLong = useLong;
}
- public ReplicationPageEventMessage(final SimpleString storeName, final int
pageNumber, final boolean isDelete) {
- this();
+ public ReplicationPageEventMessage(final SimpleString storeName, final long
pageNumber, final boolean isDelete, final boolean useLong) {
+ this(useLong);
this.pageNumber = pageNumber;
this.isDelete = isDelete;
this.storeName = storeName;
}
@Override
public int expectedEncodeSize() {
- return PACKET_HEADERS_SIZE +
- SimpleString.sizeofString(storeName) + //
buffer.writeSimpleString(storeName);
- DataConstants.SIZE_INT + // buffer.writeInt(pageNumber);
- DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(isDelete);
+ if (useLong) {
+ return PACKET_HEADERS_SIZE + SimpleString.sizeofString(storeName) +
// buffer.writeSimpleString(storeName);
+ DataConstants.SIZE_LONG + // buffer.writeLong(pageNumber);
+ DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(isDelete);
+ } else {
+ return PACKET_HEADERS_SIZE + SimpleString.sizeofString(storeName) +
// buffer.writeSimpleString(storeName);
+ DataConstants.SIZE_INT + // buffer.writeInt(pageNumber);
+ DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(isDelete);
+ }
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(storeName);
- buffer.writeInt(pageNumber);
+ if (useLong) {
+ buffer.writeLong(pageNumber);
+ } else {
+ buffer.writeInt((int) pageNumber);
Review Comment:
Is this being validated somewhere else? If not it should be checked here and
blow up if the effective value cant fit rather than sending the wrong value?
I seem to recall commenting on another case where the broker could jump
forward IDs and burn quite significant amounts of IDs. That may have been for
message IDs though., cant quite recall. Either way, simpler to be safe here now
than waste time tracking bizarre issues down later.
##########
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java:
##########
@@ -44,6 +44,8 @@ public class PacketImpl implements Packet {
// 2.21.0
public static final int ARTEMIS_2_21_0_VERSION = 132;
+ public static final int ARTEMIS_2_24_0_VERSION = 133;
Review Comment:
For whatever reason, all the other fields comment the version above...should
probably make it consistent one way or the other.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java:
##########
@@ -50,14 +47,24 @@ public class PagedReferenceImpl extends
LinkedListImpl.Node<PagedReferenceImpl>
private int messageEstimate = -1;
+ volatile PagePosition position;
+
+ @Override
+ public PagePosition getPosition() {
+ if (position == null) {
+ position = getPagedMessage().newPositionObject();
+ }
+ return position;
Review Comment:
If going with the volatile (which per previous comment may be unnecessary,
or still insufficient,
https://github.com/apache/activemq-artemis/pull/4101/files#r892248712), then
this use should probably assign it to a local variable first and and
check/return that, to avoid re-reading it upon the actual return.
##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java:
##########
@@ -102,6 +103,21 @@ public void addHead(E e) {
size++;
}
+ @Override
+ public E get(int position) {
+ Node<E> current = head.next;
+
+ for (int i = 0; i < position && current != null; i++) {
+ current = current.next;
+ }
+
+ if (current == null) {
+ throw new IndexOutOfBoundsException();
+ }
Review Comment:
The exception message could/should detail what was asked for (position) and
what it got to (i, moving declaration outside for loop) to help with diagnosis
if it ever gets thrown.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java:
##########
@@ -1612,36 +1453,57 @@ private PagedReference moveNext() {
match = true;
}
} else if (!browsing && ignored) {
- positionIgnored(message.getPosition());
+
positionIgnored(message.getPagedMessage().newPositionObject());
}
}
while (!match);
if (message != null) {
- lastOperation = lastPosition;
+ lastDelivery = message;
}
return message;
}
}
+
+ private PagedReference internalGetNext() {
+ for (;;) {
+ PagedMessage message = currentPageIterator.hasNext() ?
currentPageIterator.next() : null;
+ logger.tracef("CursorIterator::internalGetNext:: new reference
%s", message);
+ if (message != null) {
+ return cursorProvider.newReference(message,
PageSubscriptionImpl.this);
+ }
+
+ if (currentPage.getPageId() < pageStore.getCurrentWritingPage()) {
+ if (logger.isTraceEnabled()) {
+ logger.tracef("CursorIterator::moving to currentPage %s",
currentPage.getPageId() + 1);
Review Comment:
First log message has prefix "CursorIterator::internalGetNext::" but second
does not. Should it?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -3101,29 +3108,21 @@ protected void removeMessageReference(ConsumerHolder<?
extends Consumer> holder,
refRemoved(ref);
}
- private void checkDepage(boolean noWait) {
- if (pageIterator != null && pageSubscription.isPaging() &&
!depagePending && needsDepage() && (noWait ? pageIterator.tryNext() > 0 :
pageIterator.hasNext())) {
+ private void checkDepage() {
+ if (pageIterator != null && pageSubscription.isPaging() &&
!depagePending && needsDepage() && pageIterator.tryNext() !=
PageIterator.NextResult.noElements) {
scheduleDepage(false);
}
}
/**
- * This is a common check we do before scheduling depaging.. or while
depaging.
- * Before scheduling a depage runnable we verify if it fits / needs
depaging.
- * We also check for while needsDepage While depaging.
- * This is just to avoid a copy & paste dependency
+ *
+ * This is a check on page sizing.
*
* @return
*/
private boolean needsDepage() {
- return queueMemorySize.get() <
pageSubscription.getPagingStore().getMaxSize() &&
- /**
- * In most cases, one depage round following by at most
MAX_SCHEDULED_RUNNERS deliver round,
- * thus we just need to read MAX_DELIVERIES_IN_LOOP *
MAX_SCHEDULED_RUNNERS messages. If we read too much, the message reference
- * maybe discarded by gc collector in response to memory demand and
we need to read it again at
- * a great cost when delivering.
- */
- intermediateMessageReferences.size() + messageReferences.size() <
MAX_DEPAGE_NUM;
+ return queueMemorySize.getSize() <
pageSubscription.getPagingStore().getMaxPageReadBytes() &&
+ queueMemorySize.getElements() <
pageSubscription.getPagingStore().getMaxPageReadMessages();
Review Comment:
Is this right? Isnt queueMemorySize tracking the total size/count of things
in the queue? If so comparing that to the size/count that can be read from a
page at one time doesnt seem like the right metric to decide on whether its
able to depage stuff.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ReaderContext.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.paging.cursor;
+
+import org.apache.activemq.artemis.core.paging.impl.Page;
+
+public class ReaderContext {
+
+ Page page;
+ int position;
+ int messageNr;
+
+ public void reset() {
+ position = messageNr = 0;
+ }
+
+ public int getPosition() {
+ return position;
+ }
+
+ public ReaderContext setPosition(int position) {
+ this.position = position;
+ return this;
+ }
+
+ public int getMessageNr() {
+ return messageNr;
+ }
+
+ public ReaderContext setMessageNr(int messageNr) {
+ this.messageNr = messageNr;
+ return this;
+ }
Review Comment:
Ditto re: Number vs Nr
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java:
##########
@@ -25,45 +25,60 @@
public class ReplicationPageWriteMessage extends PacketImpl implements
MessagePacketI {
- private int pageNumber;
+ protected long pageNumber;
- private PagedMessage pagedMessage;
+ protected PagedMessage pagedMessage;
- public ReplicationPageWriteMessage() {
+ final boolean useLong;
+
+ public ReplicationPageWriteMessage(final boolean useLong) {
super(PacketImpl.REPLICATION_PAGE_WRITE);
+ this.useLong = useLong;
}
- public ReplicationPageWriteMessage(final PagedMessage pagedMessage, final
int pageNumber) {
- this();
+ public ReplicationPageWriteMessage(final PagedMessage pagedMessage, final
long pageNumber, final boolean useLong) {
+ this(useLong);
this.pageNumber = pageNumber;
this.pagedMessage = pagedMessage;
}
@Override
public int expectedEncodeSize() {
- return PACKET_HEADERS_SIZE +
- DataConstants.SIZE_INT + // buffer.writeInt(pageNumber);
- pagedMessage.getEncodeSize(); // pagedMessage.encode(buffer);
+ if (useLong) {
+ return PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG + //
buffer.writeLong(pageNumber);
+ pagedMessage.getEncodeSize(); // pagedMessage.encode(buffer);
+ } else {
+ return PACKET_HEADERS_SIZE + DataConstants.SIZE_INT + //
buffer.writeInt(pageNumber);
+ pagedMessage.getEncodeSize(); // pagedMessage.encode(buffer);
+ }
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
- buffer.writeInt(pageNumber);
+ if (useLong) {
+ buffer.writeLong(pageNumber);
+ } else {
+ buffer.writeInt((int) pageNumber);
Review Comment:
Ditto
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java:
##########
@@ -61,18 +57,21 @@
import
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
-import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
-import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.jboss.logging.Logger;
import static
org.apache.activemq.artemis.core.server.impl.QueueImpl.DELIVERY_TIMEOUT;
public final class PageSubscriptionImpl implements PageSubscription {
private static final Logger logger =
Logger.getLogger(PageSubscriptionImpl.class);
+ private static final Logger cursorLogger =
Logger.getLogger(CursorIterator.class);
Review Comment:
TODO
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java:
##########
@@ -543,77 +255,105 @@ public boolean delete(final PagedMessage[] messages)
throws Exception {
logger.debugf("Deleting pageNr=%d on store %s", pageId, storeName);
}
- final List<Long> largeMessageIds;
- if (messages != null && messages.length > 0) {
- largeMessageIds = new ArrayList<>();
- for (PagedMessage msg : messages) {
- if ((msg.getMessage()).isLargeMessage()) {
- // this will trigger large message delete: no need to do it
- // for non-large messages!
- msg.getMessage().usageDown();
- largeMessageIds.add(msg.getMessage().getMessageID());
+ if (messages != null) {
+ try (LinkedListIterator<PagedMessage> iter = messages.iterator()) {
+ while (iter.hasNext()) {
+ PagedMessage msg = iter.next();
+ if ((msg.getMessage()).isLargeMessage()) {
+ ((LargeServerMessage)(msg.getMessage())).deleteFile();
+ msg.getMessage().usageDown();
+ }
}
}
- } else {
- largeMessageIds = Collections.emptyList();
}
- try {
- if (!storageManager.waitOnOperations(5000)) {
-
ActiveMQServerLogger.LOGGER.timedOutWaitingForLargeMessagesDeletion(largeMessageIds);
+ storageManager.afterCompleteOperations(new IOCallback() {
+ @Override
+ public void done() {
+ try {
+ if (suspiciousRecords) {
+ ActiveMQServerLogger.LOGGER.pageInvalid(file.getFileName(),
file.getFileName());
+ file.renameTo(file.getFileName() + ".invalidPage");
+ } else {
+ file.delete();
+ }
+ referenceCounter.reset();
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.pageDeleteError(e);
+ }
}
- if (suspiciousRecords) {
- ActiveMQServerLogger.LOGGER.pageInvalid(file.getFileName(),
file.getFileName());
- file.renameTo(file.getFileName() + ".invalidPage");
- } else {
- file.delete();
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+
}
+ });
- return true;
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.pageDeleteError(e);
- return false;
+ return true;
+ }
+
+ public int readNumberOfMessages() throws Exception {
+ boolean wasOpen = isOpen();
+
+ if (!wasOpen) {
+ if (!open(false)) {
+ return 0;
+ }
+ }
+
+ try {
+ int numberOfMessages =
PageReadWriter.readFromSequentialFile(this.storageManager,
+
this.storeName,
+
this.fileFactory,
+
this.file,
+
this.pageId,
+ null,
+
PageReadWriter.SKIP_ALL,
+ null,
+ null);
+ if (logger.isDebugEnabled()) {
+ logger.debug(">>> Reading numberOfMessages page " + this.pageId +
", returning " + numberOfMessages);
+ }
+ return numberOfMessages;
+ } finally {
+ if (!wasOpen) {
+ close(false);
+ }
}
}
public int getNumberOfMessages() {
- return numberOfMessages.intValue();
+ return numberOfMessages;
Review Comment:
Seems a bit odd for the numberOfMessages (and below, size) of the Page to
have been stored in an AtomicInteger before, and still be getting updated by
synchronized methods now, but the getters not to be and the field storage no
longer even be volatile.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]