gemmellr commented on code in PR #4101:
URL: https://github.com/apache/activemq-artemis/pull/4101#discussion_r891298940
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java:
##########
@@ -543,77 +283,110 @@ public boolean delete(final PagedMessage[] messages)
throws Exception {
logger.debugf("Deleting pageNr=%d on store %s", pageId, storeName);
}
- final List<Long> largeMessageIds;
- if (messages != null && messages.length > 0) {
- largeMessageIds = new ArrayList<>();
- for (PagedMessage msg : messages) {
- if ((msg.getMessage()).isLargeMessage()) {
- // this will trigger large message delete: no need to do it
- // for non-large messages!
- msg.getMessage().usageDown();
- largeMessageIds.add(msg.getMessage().getMessageID());
+ if (messages != null) {
+ try (LinkedListIterator<PagedMessage> iter = messages.iterator()) {
+ while (iter.hasNext()) {
+ PagedMessage msg = iter.next();
+ if ((msg.getMessage()).isLargeMessage()) {
+ ((LargeServerMessage)(msg.getMessage())).deleteFile();
+ msg.getMessage().usageDown();
+ }
}
}
- } else {
- largeMessageIds = Collections.emptyList();
}
- try {
- if (!storageManager.waitOnOperations(5000)) {
-
ActiveMQServerLogger.LOGGER.timedOutWaitingForLargeMessagesDeletion(largeMessageIds);
+ storageManager.afterCompleteOperations(new IOCallback() {
+ @Override
+ public void done() {
+ try {
+ if (suspiciousRecords) {
+ ActiveMQServerLogger.LOGGER.pageInvalid(file.getFileName(),
file.getFileName());
+ file.renameTo(file.getFileName() + ".invalidPage");
+ } else {
+ file.delete();
+ }
+ referenceCounter.reset();
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.pageDeleteError(e);
+ }
}
- if (suspiciousRecords) {
- ActiveMQServerLogger.LOGGER.pageInvalid(file.getFileName(),
file.getFileName());
- file.renameTo(file.getFileName() + ".invalidPage");
- } else {
- file.delete();
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+
}
+ });
- return true;
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.pageDeleteError(e);
- return false;
+ return true;
+ }
+
+ public int readNumberOfMessages() throws Exception {
+ boolean wasOpen = isOpen();
+
+ if (!wasOpen) {
+ if (!open(false)) {
+ return 0;
+ }
+ }
+
+ try {
+ int numberOfMessages =
PageReadWriter.readFromSequentialFile(this.storageManager,
+
this.storeName,
+
this.fileFactory,
+
this.file,
+
this.pageId,
+ null,
+
PageReadWriter.SKIP_ALL,
+ null,
+ null);
+ if (logger.isDebugEnabled()) {
+ logger.debug(">>> Reading numberOfMessages page " + this.pageId +
", returning " + numberOfMessages);
+ }
+ return numberOfMessages;
+ } finally {
+ if (!wasOpen) {
+ close(false);
+ }
}
}
public int getNumberOfMessages() {
- return numberOfMessages.intValue();
+ return numberOfMessages;
}
public int getSize() {
- return size.intValue();
+ return size;
}
- @Override
- public String toString() {
- return "Page::pageNr=" + this.pageId + ", file=" + this.file;
+ private void setSize(int size) {
+ this.size = size;
}
@Override
- public int compareTo(Page otherPage) {
- return otherPage.getPageId() - this.pageId;
+ public String toString() {
+ return "Page::seqCreation=" + seqInt + ", pageNr=" + this.pageId + ",
file=" + this.file;
}
@Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + pageId;
- return result;
+ public int compareTo(Page o) {
+ return 0;
Review Comment:
This seems odd. If its no longer comparable, perhaps remove the interface?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java:
##########
@@ -17,57 +17,80 @@
package org.apache.activemq.artemis.core.paging.impl;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.paging.PagedMessage;
-import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.artemis.utils.Env;
+import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.collections.EmptyList;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.jboss.logging.Logger;
public final class Page implements Comparable<Page> {
+ private static final AtomicInteger factory = new AtomicInteger(0);
+
+ private final int seqInt = factory.incrementAndGet();
+
private static final Logger logger = Logger.getLogger(Page.class);
- public static final int SIZE_RECORD = DataConstants.SIZE_BYTE +
DataConstants.SIZE_INT + DataConstants.SIZE_BYTE;
+ private final ReferenceCounterUtil referenceCounter = new
ReferenceCounterUtil();
- private static final byte START_BYTE = (byte) '{';
+ public void usageReset() {
+ referenceCounter.reset();
+ }
- private static final byte END_BYTE = (byte) '}';
+ public int usageUp() {
+ return referenceCounter.increment();
+ }
+
+ public int usageDown() {
+ return referenceCounter.decrement();
+ }
- private final int pageId;
+ /** This is an utility method to help you call usageDown while using a try
(closeable) call.
+ * */
+ public AutoCloseable refCloseable() {
+ return referenceCounter;
+ }
+
+ /** to be called when the page is supposed to be released */
+ public void releaseTask(Consumer<Page> releaseTask) {
+ referenceCounter.setTask(() -> releaseTask.accept(this));
+ }
+
+ private final long pageId;
private boolean suspiciousRecords = false;
- private final AtomicInteger numberOfMessages = new AtomicInteger(0);
+ private int numberOfMessages;
private final SequentialFile file;
private final SequentialFileFactory fileFactory;
+ private volatile LinkedList<PagedMessage> messages;
+
/**
* The page cache that will be filled with data as we write more data
*/
- private volatile LivePageCache pageCache;
+ private volatile Consumer<PagedMessage> callback;
Review Comment:
Comment above is inaccurate now
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java:
##########
@@ -628,6 +636,24 @@ public AddressSettings setMaxSizeBytes(final long
maxSizeBytes) {
return this;
}
+ public int getMaxReadPageMessages() {
+ return maxReadPageMessages != null ? maxReadPageMessages :
AddressSettings.DEFAULT_MAX_READ_PAGE_MESSAGES;
+ }
+
+ public AddressSettings setMaxReadPageMessages(final int
maxReadPageMessages) {
+ this.maxReadPageMessages = maxReadPageMessages;
+ return this;
+ }
+
+ public int getMaxReadPageBytes() {
+ return maxReadPageBytes != null ? maxReadPageBytes : 2 *
getPageSizeBytes();
Review Comment:
Does the doc say this defaults to double the page size? Dont recall seeing
it.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageCache.java:
##########
@@ -0,0 +1,70 @@
+/**
Review Comment:
Should be a comment
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -207,8 +208,8 @@ private void checkIDSupplier(NodeStore<MessageReference>
nodeStore) {
// The quantity of pagedReferences on messageReferences priority list
private final AtomicInteger pagedReferences = new AtomicInteger(0);
- // The estimate of memory being consumed by this queue. Used to calculate
instances of messages to depage
- final AtomicInteger queueMemorySize = new AtomicInteger(0);
+
+ final SizeAwareMetric queueMemorySize = new SizeAwareMetric();
Review Comment:
Ah so it did use a counter before.
##########
docs/user-manual/en/paging.md:
##########
@@ -96,12 +96,20 @@ Property Name|Description|Default
`max-size-messages`|The max number of messages the address could have before
entering on page mode.| -1 (disabled)
`page-size-bytes`|The size of each page file used on the paging system|10MB
`address-full-policy`|This must be set to `PAGE` for paging to enable. If the
value is `PAGE` then further messages will be paged to disk. If the value is
`DROP` then further messages will be silently dropped. If the value is `FAIL`
then the messages will be dropped and the client message producers will receive
an exception. If the value is `BLOCK` then client message producers will block
when they try and send further messages.|`PAGE`
-`page-max-cache-size`|The system will keep up to `page-max-cache-size` page
files in memory to optimize IO during paging navigation.|5
+`max-read-page-messages` | how many message can be read from paging into the
Queue whenever more messages are needed. The system wtill stop reading if
`max-read-page-bytes hits the limit first.
+`max-read-page-bytes` | how much memory the messages read from paging can take
on the Queue whenever more messages are needed. The system will stop reading if
`max-read-page-messages` hits the limit first.
Review Comment:
Defaults would be good.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java:
##########
@@ -49,7 +49,7 @@ public int expectedEncodeSize() {
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
- buffer.writeInt(pageNumber);
+ buffer.writeInt((int)pageNumber);
Review Comment:
Ditto
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java:
##########
@@ -17,57 +17,80 @@
package org.apache.activemq.artemis.core.paging.impl;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.paging.PagedMessage;
-import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.artemis.utils.Env;
+import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.collections.EmptyList;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.jboss.logging.Logger;
public final class Page implements Comparable<Page> {
+ private static final AtomicInteger factory = new AtomicInteger(0);
+
+ private final int seqInt = factory.incrementAndGet();
+
private static final Logger logger = Logger.getLogger(Page.class);
- public static final int SIZE_RECORD = DataConstants.SIZE_BYTE +
DataConstants.SIZE_INT + DataConstants.SIZE_BYTE;
+ private final ReferenceCounterUtil referenceCounter = new
ReferenceCounterUtil();
- private static final byte START_BYTE = (byte) '{';
+ public void usageReset() {
+ referenceCounter.reset();
+ }
- private static final byte END_BYTE = (byte) '}';
+ public int usageUp() {
+ return referenceCounter.increment();
+ }
+
+ public int usageDown() {
+ return referenceCounter.decrement();
+ }
- private final int pageId;
+ /** This is an utility method to help you call usageDown while using a try
(closeable) call.
+ * */
Review Comment:
Seems like it could be one line
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java:
##########
@@ -17,57 +17,80 @@
package org.apache.activemq.artemis.core.paging.impl;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.paging.PagedMessage;
-import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.artemis.utils.Env;
+import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.collections.EmptyList;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.jboss.logging.Logger;
public final class Page implements Comparable<Page> {
+ private static final AtomicInteger factory = new AtomicInteger(0);
+
+ private final int seqInt = factory.incrementAndGet();
+
Review Comment:
its nice to have the logger at the top and have statics at the top, these
adds make it static, non-static, static...if you put them below the logger it
would be nicer.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageReadWriter.java:
##########
@@ -0,0 +1,281 @@
+/**
Review Comment:
Ditto
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java:
##########
@@ -463,43 +215,40 @@ public void writeDirect(PagedMessage message) throws
Exception {
if (!file.isOpen()) {
throw ActiveMQMessageBundle.BUNDLE.cannotWriteToClosedFile(file);
}
- final int messageEncodedSize = message.getEncodeSize();
- final int bufferSize = messageEncodedSize + Page.SIZE_RECORD;
- final ByteBuffer buffer = fileFactory.newBuffer(bufferSize);
- ChannelBufferWrapper activeMQBuffer = new
ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer));
- activeMQBuffer.clear();
- activeMQBuffer.writeByte(Page.START_BYTE);
- activeMQBuffer.writeInt(messageEncodedSize);
- message.encode(activeMQBuffer);
- activeMQBuffer.writeByte(Page.END_BYTE);
- assert (activeMQBuffer.readableBytes() == bufferSize) :
"messageEncodedSize is different from expected";
- //buffer limit and position are the same
- assert (buffer.remaining() == bufferSize) : "buffer position or limit
are changed";
- file.writeDirect(buffer, false);
- if (pageCache != null) {
- pageCache.addLiveMessage(message);
+ if (callback != null) {
+ callback.accept(message);
}
- //lighter than addAndGet when single writer
- numberOfMessages.lazySet(numberOfMessages.get() + 1);
- size.lazySet(size.get() + bufferSize);
+ addMessage(message);
+ this.size += PageReadWriter.writeMessage(message, fileFactory, file);
+ numberOfMessages++;
}
public void sync() throws Exception {
file.sync();
}
- public void open(boolean createFile) throws Exception {
+ public boolean isOpen() {
+ return file != null && file.isOpen();
+ }
+
+
+ public boolean open(boolean createFile) throws Exception {
+ boolean isOpen = false;
if (!file.isOpen() && (createFile || file.exists())) {
file.open();
+ isOpen = true;
}
if (file.isOpen()) {
- size.set((int) file.size());
+ isOpen = true;
+ size = (int) file.size();
Review Comment:
Should probably validate the value before truncating it?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java:
##########
@@ -1444,20 +1276,39 @@ public List<MessageReference>
getRelatedMessageReferences() {
private class CursorIterator implements PageIterator {
- private PagePositionAndFileOffset position = null;
+ // The cursorLogger is declared as static on the class, just to avoid a
getLogger() call every time
+ private final Logger logger = cursorLogger;
Review Comment:
Why assign the existing static variable to an instance variable, rather than
just using or creating the static logger here?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java:
##########
@@ -54,7 +54,7 @@ public int expectedEncodeSize() {
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(storeName);
- buffer.writeInt(pageNumber);
+ buffer.writeInt((int)pageNumber);
Review Comment:
seems like it should at least validate the number before truncating it...and
then perhaps explode if it would have done the wrong thing?
##########
docs/user-manual/en/paging.md:
##########
@@ -96,12 +96,20 @@ Property Name|Description|Default
`max-size-messages`|The max number of messages the address could have before
entering on page mode.| -1 (disabled)
`page-size-bytes`|The size of each page file used on the paging system|10MB
`address-full-policy`|This must be set to `PAGE` for paging to enable. If the
value is `PAGE` then further messages will be paged to disk. If the value is
`DROP` then further messages will be silently dropped. If the value is `FAIL`
then the messages will be dropped and the client message producers will receive
an exception. If the value is `BLOCK` then client message producers will block
when they try and send further messages.|`PAGE`
-`page-max-cache-size`|The system will keep up to `page-max-cache-size` page
files in memory to optimize IO during paging navigation.|5
+`max-read-page-messages` | how many message can be read from paging into the
Queue whenever more messages are needed. The system wtill stop reading if
`max-read-page-bytes hits the limit first.
+`max-read-page-bytes` | how much memory the messages read from paging can take
on the Queue whenever more messages are needed. The system will stop reading if
`max-read-page-messages` hits the limit first.
+`page-max-cache-size`|Deprecated and not used: `max-read-page-messages` and
`max-read-page-bytes` will replace this functionality.
Review Comment:
Not will replace, but have replaced. Elsewhere I tend to just remove options
from the docs that dont do anything. Is a message logged if the old one is used?
##########
docs/user-manual/en/paging.md:
##########
@@ -96,12 +96,20 @@ Property Name|Description|Default
`max-size-messages`|The max number of messages the address could have before
entering on page mode.| -1 (disabled)
`page-size-bytes`|The size of each page file used on the paging system|10MB
`address-full-policy`|This must be set to `PAGE` for paging to enable. If the
value is `PAGE` then further messages will be paged to disk. If the value is
`DROP` then further messages will be silently dropped. If the value is `FAIL`
then the messages will be dropped and the client message producers will receive
an exception. If the value is `BLOCK` then client message producers will block
when they try and send further messages.|`PAGE`
-`page-max-cache-size`|The system will keep up to `page-max-cache-size` page
files in memory to optimize IO during paging navigation.|5
+`max-read-page-messages` | how many message can be read from paging into the
Queue whenever more messages are needed. The system wtill stop reading if
`max-read-page-bytes hits the limit first.
+`max-read-page-bytes` | how much memory the messages read from paging can take
on the Queue whenever more messages are needed. The system will stop reading if
`max-read-page-messages` hits the limit first.
+`page-max-cache-size`|Deprecated and not used: `max-read-page-messages` and
`max-read-page-bytes` will replace this functionality.
### max-size-bytes and max-size-messages simultaneous usage
It is possible to define max-size-messages (as the maximum number of messages)
and max-messages-size (as the max number of estimated memory used by the
address) concurrently. The configured policy will start based on the first
value to reach its mark.
+#### Maximum read from page
+
+Similarly to `max-size-bytes` and `max-size-messages`, the same can happen
with `max-read-page-bytes` and `max-read-page-messages` when reading messages
from paging.
Review Comment:
The "Similarly to `max-size-bytes` and `max-size-messages` the same can
happen" bit seems superfluous. It would be cleare to just describe the
behaviour of these options rather than comparing to ultimately unrelated
options.
##########
artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderAccessor.java:
##########
@@ -5,20 +5,23 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
Review Comment:
This is wrong, its a comment not javadoc, should be left as is.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageReadWriter.java:
##########
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.impl;
+
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.Env;
+import org.jboss.logging.Logger;
+
+public class PageReadWriter {
+
+
+ private static Logger logger = Logger.getLogger(PageReadWriter.class);
+
+ public static final int SIZE_RECORD = DataConstants.SIZE_BYTE +
DataConstants.SIZE_INT + DataConstants.SIZE_BYTE;
+
+ private static final byte START_BYTE = (byte) '{';
+
+ private static final byte END_BYTE = (byte) '}';
+
+ //sizeOf(START_BYTE) + sizeOf(MESSAGE LENGTH) + sizeOf(END_BYTE)
+ private static final int HEADER_AND_TRAILER_SIZE = DataConstants.SIZE_INT +
2;
+ private static final int MINIMUM_MSG_PERSISTENT_SIZE =
HEADER_AND_TRAILER_SIZE;
+ private static final int HEADER_SIZE = HEADER_AND_TRAILER_SIZE - 1;
+ private static final int MIN_CHUNK_SIZE = Env.osPageSize();
+
+ public interface SuspectFileCallback {
+ void onSuspect(String fileName, int position, int msgNumber);
+ }
+
+ public interface PageRecordFilter {
+ boolean skip(ActiveMQBuffer buffer);
+ }
+
+ public interface ReadCallback {
+ void readComple(int size);
+ }
+
+ public static final PageRecordFilter ONLY_LARGE = (buffer) ->
!PagedMessageImpl.isLargeMessage(buffer);
+
+ public static final PageRecordFilter NO_SKIP = (buffer) -> false;
+
+ public static final PageRecordFilter SKIP_ALL = (buffer) -> true;
+
+ public static int writeMessage(PagedMessage message, SequentialFileFactory
fileFactory, SequentialFile file) throws Exception {
+ final int messageEncodedSize = message.getEncodeSize();
+ final int bufferSize = messageEncodedSize + SIZE_RECORD;
+ final ByteBuffer buffer = fileFactory.newBuffer(bufferSize);
+ ChannelBufferWrapper activeMQBuffer = new
ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer));
+ activeMQBuffer.clear();
+ activeMQBuffer.writeByte(START_BYTE);
+ activeMQBuffer.writeInt(messageEncodedSize);
+ message.encode(activeMQBuffer);
+ activeMQBuffer.writeByte(END_BYTE);
+ assert (activeMQBuffer.readableBytes() == bufferSize) :
"messageEncodedSize is different from expected";
+ //buffer limit and position are the same
+ assert (buffer.remaining() == bufferSize) : "buffer position or limit
are changed";
+ file.writeDirect(buffer, false);
+ return bufferSize;
+ }
+
+
+
Review Comment:
Its weird that there are all these newlines between methods, but a little
below this, there is a 100 line method that is screaming for some newline
grouping/seperation but has none.
##########
docs/user-manual/en/paging.md:
##########
@@ -96,12 +96,20 @@ Property Name|Description|Default
`max-size-messages`|The max number of messages the address could have before
entering on page mode.| -1 (disabled)
`page-size-bytes`|The size of each page file used on the paging system|10MB
`address-full-policy`|This must be set to `PAGE` for paging to enable. If the
value is `PAGE` then further messages will be paged to disk. If the value is
`DROP` then further messages will be silently dropped. If the value is `FAIL`
then the messages will be dropped and the client message producers will receive
an exception. If the value is `BLOCK` then client message producers will block
when they try and send further messages.|`PAGE`
-`page-max-cache-size`|The system will keep up to `page-max-cache-size` page
files in memory to optimize IO during paging navigation.|5
+`max-read-page-messages` | how many message can be read from paging into the
Queue whenever more messages are needed. The system wtill stop reading if
`max-read-page-bytes hits the limit first.
+`max-read-page-bytes` | how much memory the messages read from paging can take
on the Queue whenever more messages are needed. The system will stop reading if
`max-read-page-messages` hits the limit first.
+`page-max-cache-size`|Deprecated and not used: `max-read-page-messages` and
`max-read-page-bytes` will replace this functionality.
### max-size-bytes and max-size-messages simultaneous usage
It is possible to define max-size-messages (as the maximum number of messages)
and max-messages-size (as the max number of estimated memory used by the
address) concurrently. The configured policy will start based on the first
value to reach its mark.
+#### Maximum read from page
+
+Similarly to `max-size-bytes` and `max-size-messages`, the same can happen
with `max-read-page-bytes` and `max-read-page-messages` when reading messages
from paging.
+
+<b>Warning</b>: When messages are read from paging into memory, when they are
redelivered (for either a rollback or a closed consumers). Messages will stay
in memory until they are delivered again. In these cases of redeliveries it
would be expected to have more messages in memory than the configured maximum
values.
Review Comment:
The first sentence ends prematurely, the second one is really part of the
same sentence.
##########
docs/user-manual/en/paging.md:
##########
@@ -192,6 +200,12 @@ The pages are synced periodically and the sync period is
configured through
the same value of `journal-buffer-timeout`. When using ASYNCIO, the default
should be `3333333`.
+## Memory usage from Paged Messages.
+
+The system should keep at least one paged file in memory caching ahead reading
messages.
+Also every active subscription could keep one paged file in memory. So if your
system has too many queues (some people would call this an horizontal topology).
+So, if your system has too many queues it is recommended to minimize the
page-size.
Review Comment:
third sentence ends prematurely, its really continued by the fourth.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]