This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 46060b8  ARTEMIS-2676 PageCursorProviderImpl::cleanup can save 
decoding pages without large messages
     new ceceb66  This closes #3044
46060b8 is described below

commit 46060b89ffc3d8899c46e8d80438c50c8b76c3e7
Author: Francesco Nigro <[email protected]>
AuthorDate: Mon Mar 23 13:01:38 2020 +0100

    ARTEMIS-2676 PageCursorProviderImpl::cleanup can save decoding pages 
without large messages
---
 .../paging/cursor/impl/PageCursorProviderImpl.java |  5 +-
 .../activemq/artemis/core/paging/impl/Page.java    | 79 +++++++++++++++-------
 .../artemis/core/paging/impl/PagedMessageImpl.java | 63 ++++++++++++++---
 .../tests/unit/core/paging/impl/PageTest.java      |  8 ++-
 4 files changed, 117 insertions(+), 38 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 1990633..0a8168d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -544,7 +544,7 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
                List<PagedMessage> pgdMessagesList = null;
                try {
                   depagedPage.open();
-                  pgdMessagesList = depagedPage.read(storageManager);
+                  pgdMessagesList = depagedPage.read(storageManager, true);
                } finally {
                   try {
                      depagedPage.close(false, false);
@@ -553,7 +553,8 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
 
                   storageManager.afterPageRead();
                }
-               pgdMessages = pgdMessagesList.toArray(new 
PagedMessage[pgdMessagesList.size()]);
+               pgdMessages = pgdMessagesList.isEmpty() ? null :
+                  pgdMessagesList.toArray(new 
PagedMessage[pgdMessagesList.size()]);
             } else {
                pgdMessages = cache.getMessages();
             }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 3aad184..badfabf 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging.impl;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -210,6 +211,7 @@ public final class Page implements Comparable<Page> {
                      readFileBuffer.position(endPosition + 1);
                      assert readFileBuffer.get(endPosition) == Page.END_BYTE : 
"decoding cannot change end byte";
                      msg.initMessage(storageManager);
+                     assert validateLargeMessageStorageManager(msg);
                      if (logger.isTraceEnabled()) {
                         logger.tracef("Reading message %s on pageId=%d for 
address=%s", msg, pageId, storeName);
                      }
@@ -246,8 +248,13 @@ public final class Page implements Comparable<Page> {
    }
 
    public synchronized List<PagedMessage> read(StorageManager storage) throws 
Exception {
+      return read(storage, false);
+   }
+
+   public synchronized List<PagedMessage> read(StorageManager storage, boolean 
onlyLargeMessages) throws Exception {
       if (logger.isDebugEnabled()) {
-         logger.debug("reading page " + this.pageId + " on address = " + 
storeName);
+         logger.debugf("reading page %d on address = %s onlyLargeMessages = 
%b", storeName, pageId,
+                       storage, onlyLargeMessages);
       }
 
       if (!file.isOpen()) {
@@ -256,9 +263,11 @@ public final class Page implements Comparable<Page> {
 
       size.lazySet((int) file.size());
 
-      final List<PagedMessage> messages = readFromSequentialFile(storage);
+      final List<PagedMessage> messages = new ArrayList<>();
+
+      final int totalMessageCount = readFromSequentialFile(storage, messages, 
onlyLargeMessages);
 
-      numberOfMessages.lazySet(messages.size());
+      numberOfMessages.lazySet(totalMessageCount);
 
       return messages;
    }
@@ -316,6 +325,14 @@ public final class Page implements Comparable<Page> {
       return fileBuffer;
    }
 
+   private static boolean validateLargeMessageStorageManager(PagedMessage msg) 
{
+      if (!(msg.getMessage() instanceof LargeServerMessage)) {
+         return true;
+      }
+      LargeServerMessage largeServerMessage = ((LargeServerMessage) 
msg.getMessage());
+      return largeServerMessage.getStorageManager() != null;
+   }
+
    private static ChannelBufferWrapper wrapWhole(ByteBuffer fileBuffer) {
       final int position = fileBuffer.position();
       final int limit = fileBuffer.limit();
@@ -340,13 +357,15 @@ public final class Page implements Comparable<Page> {
    private static final int HEADER_SIZE = HEADER_AND_TRAILER_SIZE - 1;
    private static final int MIN_CHUNK_SIZE = Env.osPageSize();
 
-   private List<PagedMessage> readFromSequentialFile(StorageManager storage) 
throws Exception {
-      final List<PagedMessage> messages = new ArrayList<>();
+   private int readFromSequentialFile(StorageManager storage,
+                                                     List<PagedMessage> 
messages,
+                                                     boolean 
onlyLargeMessages) throws Exception {
       final int fileSize = (int) file.size();
       file.position(0);
       int processedBytes = 0;
       ByteBuffer fileBuffer = null;
       ChannelBufferWrapper fileBufferWrapper;
+      int totalMessageCount = 0;
       try {
          int remainingBytes = fileSize - processedBytes;
          if (remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE) {
@@ -376,29 +395,38 @@ public final class Page implements Comparable<Page> {
                      final int endPosition = fileBuffer.position() + 
encodedSize;
                      //this check must be performed upfront decoding
                      if (fileBuffer.remaining() >= (encodedSize + 1) && 
fileBuffer.get(endPosition) == Page.END_BYTE) {
-                        final PagedMessageImpl msg = new 
PagedMessageImpl(encodedSize, storageManager);
                         fileBufferWrapper.setIndex(fileBuffer.position(), 
endPosition);
-                        msg.decode(fileBufferWrapper);
-                        fileBuffer.position(endPosition + 1);
-                        assert fileBuffer.get(endPosition) == Page.END_BYTE : 
"decoding cannot change end byte";
-                        msg.initMessage(storage);
-                        assert msg.getMessage() instanceof LargeServerMessage 
&& ((LargeServerMessage)msg.getMessage()).getStorageManager() != null || 
!(msg.getMessage() instanceof LargeServerMessage);
-                        if (logger.isTraceEnabled()) {
-                           logger.tracef("Reading message %s on pageId=%d for 
address=%s", msg, pageId, storeName);
+                        final boolean skipMessage;
+                        if (onlyLargeMessages) {
+                           skipMessage = 
!PagedMessageImpl.isLargeMessage(fileBufferWrapper);
+                        } else {
+                           skipMessage = false;
                         }
-                        messages.add(msg);
+                        if (!skipMessage) {
+                           final PagedMessageImpl msg = new 
PagedMessageImpl(encodedSize, storageManager);
+                           msg.decode(fileBufferWrapper);
+                           assert fileBuffer.get(endPosition) == Page.END_BYTE 
: "decoding cannot change end byte";
+                           msg.initMessage(storage);
+                           assert validateLargeMessageStorageManager(msg);
+                           if (logger.isTraceEnabled()) {
+                              logger.tracef("Reading message %s on pageId=%d 
for address=%s", msg, pageId, storeName);
+                           }
+                           messages.add(msg);
+                        }
+                        totalMessageCount++;
+                        fileBuffer.position(endPosition + 1);
                         processedBytes = nextPosition;
                      } else {
-                        markFileAsSuspect(file.getFileName(), processedBytes, 
messages.size());
-                        return messages;
+                        markFileAsSuspect(file.getFileName(), processedBytes, 
totalMessageCount + 1);
+                        return totalMessageCount;
                      }
                   } else {
-                     markFileAsSuspect(file.getFileName(), processedBytes, 
messages.size());
-                     return messages;
+                     markFileAsSuspect(file.getFileName(), processedBytes, 
totalMessageCount + 1);
+                     return totalMessageCount;
                   }
                } else {
-                  markFileAsSuspect(file.getFileName(), processedBytes, 
messages.size());
-                  return messages;
+                  markFileAsSuspect(file.getFileName(), processedBytes, 
totalMessageCount + 1);
+                  return totalMessageCount;
                }
                remainingBytes = fileSize - processedBytes;
             }
@@ -408,7 +436,7 @@ public final class Page implements Comparable<Page> {
          if (logger.isTraceEnabled()) {
             logger.tracef("%s has %d bytes of unknown data at position = %d", 
file.getFileName(), remainingBytes, processedBytes);
          }
-         return messages;
+         return totalMessageCount;
       } finally {
          if (fileBuffer != null) {
             fileFactory.releaseBuffer(fileBuffer);
@@ -500,11 +528,12 @@ public final class Page implements Comparable<Page> {
       }
 
       if (logger.isDebugEnabled()) {
-         logger.debug("Deleting pageNr=" + pageId + " on store " + storeName);
+         logger.debugf("Deleting pageNr=%d on store %d", pageId, storeName);
       }
 
-      List<Long> largeMessageIds = new ArrayList<>();
-      if (messages != null) {
+      final List<Long> largeMessageIds;
+      if (messages != null && messages.length > 0) {
+         largeMessageIds = new ArrayList<>();
          for (PagedMessage msg : messages) {
             if ((msg.getMessage()).isLargeMessage()) {
                // this will trigger large message delete: no need to do it
@@ -513,6 +542,8 @@ public final class Page implements Comparable<Page> {
                largeMessageIds.add(msg.getMessage().getMessageID());
             }
          }
+      } else {
+         largeMessageIds = Collections.emptyList();
       }
 
       try {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
index b7b49df..a643862 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
@@ -35,6 +35,43 @@ import org.apache.activemq.artemis.utils.DataConstants;
  */
 public class PagedMessageImpl implements PagedMessage {
 
+   // It encapsulates the logic to detect large message types
+   private static final class LargeMessageType {
+
+      private static final byte NONE = 0;
+      private static final byte CORE = 1;
+      private static final byte NOT_CORE = 2;
+
+      public static boolean isLargeMessage(byte encodedValue) {
+         switch (encodedValue) {
+            case LargeMessageType.NONE:
+               return false;
+            case LargeMessageType.CORE:
+            case LargeMessageType.NOT_CORE:
+               return true;
+            default:
+               throw new IllegalStateException("This largeMessageType isn't 
supported: " + encodedValue);
+         }
+      }
+
+      public static boolean isCoreLargeMessage(Message message) {
+         return message.isLargeMessage() && message instanceof ICoreMessage;
+      }
+
+      public static boolean isCoreLargeMessageType(byte encodedValue) {
+         return encodedValue == LargeMessageType.CORE;
+      }
+
+      public static byte valueOf(Message message) {
+         if (!message.isLargeMessage()) {
+            return NONE;
+         }
+         if (message instanceof ICoreMessage) {
+            return CORE;
+         }
+         return NOT_CORE;
+      }
+   }
    /**
     * Large messages will need to be instantiated lazily during getMessage 
when the StorageManager
     * is available
@@ -116,13 +153,21 @@ public class PagedMessageImpl implements PagedMessage {
 
    // EncodingSupport implementation --------------------------------
 
+   /**
+    * This method won't move the {@link ActiveMQBuffer#readerIndex()} of 
{@code buffer}.
+    */
+   public static boolean isLargeMessage(ActiveMQBuffer buffer) {
+      // skip transactionID
+      return 
LargeMessageType.isLargeMessage(buffer.getByte(buffer.readerIndex() + 
Long.BYTES));
+   }
+
    @Override
    public void decode(final ActiveMQBuffer buffer) {
       transactionID = buffer.readLong();
 
-      boolean isLargeMessage = buffer.readBoolean();
+      boolean isCoreLargeMessage = 
LargeMessageType.isCoreLargeMessageType(buffer.readByte());
 
-      if (isLargeMessage) {
+      if (isCoreLargeMessage) {
          int largeMessageHeaderSize = buffer.readInt();
 
          if (storageManager == null) {
@@ -155,12 +200,12 @@ public class PagedMessageImpl implements PagedMessage {
    public void encode(final ActiveMQBuffer buffer) {
       buffer.writeLong(transactionID);
 
-      boolean isLargeMessage = isLargeMessage();
+      byte largeMessageType = LargeMessageType.valueOf(message);
 
-      buffer.writeBoolean(isLargeMessage);
+      buffer.writeByte(largeMessageType);
 
-      if (isLargeMessage) {
-         
buffer.writeInt(LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message));
+      if (LargeMessageType.isCoreLargeMessageType(largeMessageType)) {
+         
buffer.writeInt(LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)
 message));
          LargeMessagePersister.getInstance().encode(buffer, 
(LargeServerMessage) message);
       } else {
          message.getPersister().encode(buffer, message);
@@ -173,13 +218,9 @@ public class PagedMessageImpl implements PagedMessage {
       }
    }
 
-   public boolean isLargeMessage() {
-      return message instanceof ICoreMessage && 
((ICoreMessage)message).isLargeMessage();
-   }
-
    @Override
    public int getEncodeSize() {
-      if (isLargeMessage()) {
+      if (LargeMessageType.isCoreLargeMessage(message)) {
          return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + 
DataConstants.SIZE_INT + 
LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message) +
             DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
       } else {
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
index 1aa28c2..894651e 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
@@ -150,7 +150,7 @@ public class PageTest extends ActiveMQTestBase {
       file.open();
       page = new Page(new SimpleString("something"), storageManager, factory, 
file, 10);
 
-      List<PagedMessage> msgs = page.read(storageManager);
+      List<PagedMessage> msgs = page.read(storageManager, largeMessages);
 
       Assert.assertEquals(numberOfElements, msgs.size());
 
@@ -164,6 +164,12 @@ public class PageTest extends ActiveMQTestBase {
          Assert.assertEquals(largeMessages ? 1 : 0, 
pagedMessage.getMessage().getUsage());
       }
 
+      if (!largeMessages) {
+         Page tmpPage = new Page(new SimpleString("something"), 
storageManager, factory, file, 10);
+         Assert.assertEquals(0, tmpPage.read(storageManager, true).size());
+         Assert.assertEquals(numberOfElements, tmpPage.getNumberOfMessages());
+      }
+
       Assert.assertTrue(page.delete(msgs.toArray(new 
PagedMessage[msgs.size()])));
 
       for (PagedMessage pagedMessage : msgs) {

Reply via email to