This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 46060b8 ARTEMIS-2676 PageCursorProviderImpl::cleanup can save
decoding pages without large messages
new ceceb66 This closes #3044
46060b8 is described below
commit 46060b89ffc3d8899c46e8d80438c50c8b76c3e7
Author: Francesco Nigro <[email protected]>
AuthorDate: Mon Mar 23 13:01:38 2020 +0100
ARTEMIS-2676 PageCursorProviderImpl::cleanup can save decoding pages
without large messages
---
.../paging/cursor/impl/PageCursorProviderImpl.java | 5 +-
.../activemq/artemis/core/paging/impl/Page.java | 79 +++++++++++++++-------
.../artemis/core/paging/impl/PagedMessageImpl.java | 63 ++++++++++++++---
.../tests/unit/core/paging/impl/PageTest.java | 8 ++-
4 files changed, 117 insertions(+), 38 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 1990633..0a8168d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -544,7 +544,7 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
List<PagedMessage> pgdMessagesList = null;
try {
depagedPage.open();
- pgdMessagesList = depagedPage.read(storageManager);
+ pgdMessagesList = depagedPage.read(storageManager, true);
} finally {
try {
depagedPage.close(false, false);
@@ -553,7 +553,8 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
storageManager.afterPageRead();
}
- pgdMessages = pgdMessagesList.toArray(new
PagedMessage[pgdMessagesList.size()]);
+ pgdMessages = pgdMessagesList.isEmpty() ? null :
+ pgdMessagesList.toArray(new
PagedMessage[pgdMessagesList.size()]);
} else {
pgdMessages = cache.getMessages();
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 3aad184..badfabf 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -210,6 +211,7 @@ public final class Page implements Comparable<Page> {
readFileBuffer.position(endPosition + 1);
assert readFileBuffer.get(endPosition) == Page.END_BYTE :
"decoding cannot change end byte";
msg.initMessage(storageManager);
+ assert validateLargeMessageStorageManager(msg);
if (logger.isTraceEnabled()) {
logger.tracef("Reading message %s on pageId=%d for
address=%s", msg, pageId, storeName);
}
@@ -246,8 +248,13 @@ public final class Page implements Comparable<Page> {
}
public synchronized List<PagedMessage> read(StorageManager storage) throws
Exception {
+ return read(storage, false);
+ }
+
+ public synchronized List<PagedMessage> read(StorageManager storage, boolean
onlyLargeMessages) throws Exception {
if (logger.isDebugEnabled()) {
- logger.debug("reading page " + this.pageId + " on address = " +
storeName);
+ logger.debugf("reading page %d on address = %s onlyLargeMessages =
%b", storeName, pageId,
+ storage, onlyLargeMessages);
}
if (!file.isOpen()) {
@@ -256,9 +263,11 @@ public final class Page implements Comparable<Page> {
size.lazySet((int) file.size());
- final List<PagedMessage> messages = readFromSequentialFile(storage);
+ final List<PagedMessage> messages = new ArrayList<>();
+
+ final int totalMessageCount = readFromSequentialFile(storage, messages,
onlyLargeMessages);
- numberOfMessages.lazySet(messages.size());
+ numberOfMessages.lazySet(totalMessageCount);
return messages;
}
@@ -316,6 +325,14 @@ public final class Page implements Comparable<Page> {
return fileBuffer;
}
+ private static boolean validateLargeMessageStorageManager(PagedMessage msg)
{
+ if (!(msg.getMessage() instanceof LargeServerMessage)) {
+ return true;
+ }
+ LargeServerMessage largeServerMessage = ((LargeServerMessage)
msg.getMessage());
+ return largeServerMessage.getStorageManager() != null;
+ }
+
private static ChannelBufferWrapper wrapWhole(ByteBuffer fileBuffer) {
final int position = fileBuffer.position();
final int limit = fileBuffer.limit();
@@ -340,13 +357,15 @@ public final class Page implements Comparable<Page> {
private static final int HEADER_SIZE = HEADER_AND_TRAILER_SIZE - 1;
private static final int MIN_CHUNK_SIZE = Env.osPageSize();
- private List<PagedMessage> readFromSequentialFile(StorageManager storage)
throws Exception {
- final List<PagedMessage> messages = new ArrayList<>();
+ private int readFromSequentialFile(StorageManager storage,
+ List<PagedMessage>
messages,
+ boolean
onlyLargeMessages) throws Exception {
final int fileSize = (int) file.size();
file.position(0);
int processedBytes = 0;
ByteBuffer fileBuffer = null;
ChannelBufferWrapper fileBufferWrapper;
+ int totalMessageCount = 0;
try {
int remainingBytes = fileSize - processedBytes;
if (remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE) {
@@ -376,29 +395,38 @@ public final class Page implements Comparable<Page> {
final int endPosition = fileBuffer.position() +
encodedSize;
//this check must be performed upfront decoding
if (fileBuffer.remaining() >= (encodedSize + 1) &&
fileBuffer.get(endPosition) == Page.END_BYTE) {
- final PagedMessageImpl msg = new
PagedMessageImpl(encodedSize, storageManager);
fileBufferWrapper.setIndex(fileBuffer.position(),
endPosition);
- msg.decode(fileBufferWrapper);
- fileBuffer.position(endPosition + 1);
- assert fileBuffer.get(endPosition) == Page.END_BYTE :
"decoding cannot change end byte";
- msg.initMessage(storage);
- assert msg.getMessage() instanceof LargeServerMessage
&& ((LargeServerMessage)msg.getMessage()).getStorageManager() != null ||
!(msg.getMessage() instanceof LargeServerMessage);
- if (logger.isTraceEnabled()) {
- logger.tracef("Reading message %s on pageId=%d for
address=%s", msg, pageId, storeName);
+ final boolean skipMessage;
+ if (onlyLargeMessages) {
+ skipMessage =
!PagedMessageImpl.isLargeMessage(fileBufferWrapper);
+ } else {
+ skipMessage = false;
}
- messages.add(msg);
+ if (!skipMessage) {
+ final PagedMessageImpl msg = new
PagedMessageImpl(encodedSize, storageManager);
+ msg.decode(fileBufferWrapper);
+ assert fileBuffer.get(endPosition) == Page.END_BYTE
: "decoding cannot change end byte";
+ msg.initMessage(storage);
+ assert validateLargeMessageStorageManager(msg);
+ if (logger.isTraceEnabled()) {
+ logger.tracef("Reading message %s on pageId=%d
for address=%s", msg, pageId, storeName);
+ }
+ messages.add(msg);
+ }
+ totalMessageCount++;
+ fileBuffer.position(endPosition + 1);
processedBytes = nextPosition;
} else {
- markFileAsSuspect(file.getFileName(), processedBytes,
messages.size());
- return messages;
+ markFileAsSuspect(file.getFileName(), processedBytes,
totalMessageCount + 1);
+ return totalMessageCount;
}
} else {
- markFileAsSuspect(file.getFileName(), processedBytes,
messages.size());
- return messages;
+ markFileAsSuspect(file.getFileName(), processedBytes,
totalMessageCount + 1);
+ return totalMessageCount;
}
} else {
- markFileAsSuspect(file.getFileName(), processedBytes,
messages.size());
- return messages;
+ markFileAsSuspect(file.getFileName(), processedBytes,
totalMessageCount + 1);
+ return totalMessageCount;
}
remainingBytes = fileSize - processedBytes;
}
@@ -408,7 +436,7 @@ public final class Page implements Comparable<Page> {
if (logger.isTraceEnabled()) {
logger.tracef("%s has %d bytes of unknown data at position = %d",
file.getFileName(), remainingBytes, processedBytes);
}
- return messages;
+ return totalMessageCount;
} finally {
if (fileBuffer != null) {
fileFactory.releaseBuffer(fileBuffer);
@@ -500,11 +528,12 @@ public final class Page implements Comparable<Page> {
}
if (logger.isDebugEnabled()) {
- logger.debug("Deleting pageNr=" + pageId + " on store " + storeName);
+ logger.debugf("Deleting pageNr=%d on store %d", pageId, storeName);
}
- List<Long> largeMessageIds = new ArrayList<>();
- if (messages != null) {
+ final List<Long> largeMessageIds;
+ if (messages != null && messages.length > 0) {
+ largeMessageIds = new ArrayList<>();
for (PagedMessage msg : messages) {
if ((msg.getMessage()).isLargeMessage()) {
// this will trigger large message delete: no need to do it
@@ -513,6 +542,8 @@ public final class Page implements Comparable<Page> {
largeMessageIds.add(msg.getMessage().getMessageID());
}
}
+ } else {
+ largeMessageIds = Collections.emptyList();
}
try {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
index b7b49df..a643862 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
@@ -35,6 +35,43 @@ import org.apache.activemq.artemis.utils.DataConstants;
*/
public class PagedMessageImpl implements PagedMessage {
+ // It encapsulates the logic to detect large message types
+ private static final class LargeMessageType {
+
+ private static final byte NONE = 0;
+ private static final byte CORE = 1;
+ private static final byte NOT_CORE = 2;
+
+ public static boolean isLargeMessage(byte encodedValue) {
+ switch (encodedValue) {
+ case LargeMessageType.NONE:
+ return false;
+ case LargeMessageType.CORE:
+ case LargeMessageType.NOT_CORE:
+ return true;
+ default:
+ throw new IllegalStateException("This largeMessageType isn't
supported: " + encodedValue);
+ }
+ }
+
+ public static boolean isCoreLargeMessage(Message message) {
+ return message.isLargeMessage() && message instanceof ICoreMessage;
+ }
+
+ public static boolean isCoreLargeMessageType(byte encodedValue) {
+ return encodedValue == LargeMessageType.CORE;
+ }
+
+ public static byte valueOf(Message message) {
+ if (!message.isLargeMessage()) {
+ return NONE;
+ }
+ if (message instanceof ICoreMessage) {
+ return CORE;
+ }
+ return NOT_CORE;
+ }
+ }
/**
* Large messages will need to be instantiated lazily during getMessage
when the StorageManager
* is available
@@ -116,13 +153,21 @@ public class PagedMessageImpl implements PagedMessage {
// EncodingSupport implementation --------------------------------
+ /**
+ * This method won't move the {@link ActiveMQBuffer#readerIndex()} of
{@code buffer}.
+ */
+ public static boolean isLargeMessage(ActiveMQBuffer buffer) {
+ // skip transactionID
+ return
LargeMessageType.isLargeMessage(buffer.getByte(buffer.readerIndex() +
Long.BYTES));
+ }
+
@Override
public void decode(final ActiveMQBuffer buffer) {
transactionID = buffer.readLong();
- boolean isLargeMessage = buffer.readBoolean();
+ boolean isCoreLargeMessage =
LargeMessageType.isCoreLargeMessageType(buffer.readByte());
- if (isLargeMessage) {
+ if (isCoreLargeMessage) {
int largeMessageHeaderSize = buffer.readInt();
if (storageManager == null) {
@@ -155,12 +200,12 @@ public class PagedMessageImpl implements PagedMessage {
public void encode(final ActiveMQBuffer buffer) {
buffer.writeLong(transactionID);
- boolean isLargeMessage = isLargeMessage();
+ byte largeMessageType = LargeMessageType.valueOf(message);
- buffer.writeBoolean(isLargeMessage);
+ buffer.writeByte(largeMessageType);
- if (isLargeMessage) {
-
buffer.writeInt(LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message));
+ if (LargeMessageType.isCoreLargeMessageType(largeMessageType)) {
+
buffer.writeInt(LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)
message));
LargeMessagePersister.getInstance().encode(buffer,
(LargeServerMessage) message);
} else {
message.getPersister().encode(buffer, message);
@@ -173,13 +218,9 @@ public class PagedMessageImpl implements PagedMessage {
}
}
- public boolean isLargeMessage() {
- return message instanceof ICoreMessage &&
((ICoreMessage)message).isLargeMessage();
- }
-
@Override
public int getEncodeSize() {
- if (isLargeMessage()) {
+ if (LargeMessageType.isCoreLargeMessage(message)) {
return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE +
DataConstants.SIZE_INT +
LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message) +
DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
} else {
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
index 1aa28c2..894651e 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
@@ -150,7 +150,7 @@ public class PageTest extends ActiveMQTestBase {
file.open();
page = new Page(new SimpleString("something"), storageManager, factory,
file, 10);
- List<PagedMessage> msgs = page.read(storageManager);
+ List<PagedMessage> msgs = page.read(storageManager, largeMessages);
Assert.assertEquals(numberOfElements, msgs.size());
@@ -164,6 +164,12 @@ public class PageTest extends ActiveMQTestBase {
Assert.assertEquals(largeMessages ? 1 : 0,
pagedMessage.getMessage().getUsage());
}
+ if (!largeMessages) {
+ Page tmpPage = new Page(new SimpleString("something"),
storageManager, factory, file, 10);
+ Assert.assertEquals(0, tmpPage.read(storageManager, true).size());
+ Assert.assertEquals(numberOfElements, tmpPage.getNumberOfMessages());
+ }
+
Assert.assertTrue(page.delete(msgs.toArray(new
PagedMessage[msgs.size()])));
for (PagedMessage pagedMessage : msgs) {