ARTEMIS-1650 Improve paged message acknowledge

Cache `messageID`, `transactionID` and `isLargeMessage`
in PagedReference, so that when acknowledge, we do not have to
get PagedMessage which may be GCed and cause re-read entire page.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/822445a7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/822445a7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/822445a7

Branch: refs/heads/master
Commit: 822445a717f943c64a84b2ac6a0af8ace9e5cd23
Parents: 33b265c
Author: huaishk <shoukunh...@gmail.com>
Authored: Wed Jan 31 09:48:43 2018 +0800
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Thu Feb 8 09:12:57 2018 -0500

----------------------------------------------------------------------
 .../core/paging/cursor/PagedReference.java      |  4 ++++
 .../core/paging/cursor/PagedReferenceImpl.java  | 24 ++++++++++++++++++++
 .../cursor/impl/PageSubscriptionImpl.java       |  4 ++--
 .../artemis/core/server/MessageReference.java   |  2 ++
 .../core/server/impl/LastValueQueue.java        |  5 ++++
 .../core/server/impl/MessageReferenceImpl.java  |  5 ++++
 .../artemis/core/server/impl/RefsOperation.java |  5 +++-
 .../core/server/impl/ServerConsumerImpl.java    |  2 +-
 8 files changed, 47 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
index c1ff089..be2d042 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
@@ -24,4 +24,8 @@ public interface PagedReference extends MessageReference {
    PagePosition getPosition();
 
    PagedMessage getPagedMessage();
+
+   boolean isLargeMessage();
+
+   long getTransactionID();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 7189007..42c5423 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -53,6 +53,12 @@ public class PagedReferenceImpl extends 
LinkedListImpl.Node<PagedReferenceImpl>
 
    private Object protocolData;
 
+   private final boolean largeMessage;
+
+   private final long transactionID;
+
+   private final long messageID;
+
    @Override
    public Object getProtocolData() {
       return protocolData;
@@ -95,6 +101,9 @@ public class PagedReferenceImpl extends 
LinkedListImpl.Node<PagedReferenceImpl>
       this.position = position;
       this.message = new WeakReference<>(message);
       this.subscription = subscription;
+      this.largeMessage = message.getMessage().isLargeMessage();
+      this.transactionID = message.getTransactionID();
+      this.messageID = message.getMessage().getMessageID();
    }
 
    @Override
@@ -256,4 +265,19 @@ public class PagedReferenceImpl extends 
LinkedListImpl.Node<PagedReferenceImpl>
       return this.consumerId;
    }
 
+   @Override
+   public boolean isLargeMessage() {
+      return largeMessage;
+   }
+
+   @Override
+   public long getTransactionID() {
+      return transactionID;
+   }
+
+   @Override
+   public long getMessageID() {
+      return messageID;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index a674935..24c69be 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -849,8 +849,8 @@ final class PageSubscriptionImpl implements 
PageSubscription {
    }
 
    private PageTransactionInfo getPageTransaction(final PagedReference 
reference) throws ActiveMQException {
-      if (reference.getPagedMessage().getTransactionID() >= 0) {
-         return 
pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID());
+      if (reference.getTransactionID() >= 0) {
+         return 
pageStore.getPagingManager().getTransaction(reference.getTransactionID());
       } else {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 799b0b0..906ea7e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -38,6 +38,8 @@ public interface MessageReference {
 
    Message getMessage();
 
+   long getMessageID();
+
    /**
     * We define this method aggregation here because on paging we need to hold 
the original estimate,
     * so we need to perform some extra steps on paging.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 7aada5e..90b8814 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -238,6 +238,11 @@ public class LastValueQueue extends QueueImpl {
       }
 
       @Override
+      public long getMessageID() {
+         return getMessage().getMessageID();
+      }
+
+      @Override
       public Queue getQueue() {
          return ref.getQueue();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 1b434bc..7543ba5 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -147,6 +147,11 @@ public class MessageReferenceImpl extends 
LinkedListImpl.Node<MessageReferenceIm
    }
 
    @Override
+   public long getMessageID() {
+      return getMessage().getMessageID();
+   }
+
+   @Override
    public Queue getQueue() {
       return queue;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 6bf69ed..e492985 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -159,7 +160,9 @@ public class RefsOperation extends 
TransactionOperationAbstract {
 
       if (pagedMessagesToPostACK != null) {
          for (MessageReference refmsg : pagedMessagesToPostACK) {
-            decrementRefCount(refmsg);
+            if (((PagedReference) refmsg).isLargeMessage()) {
+               decrementRefCount(refmsg);
+            }
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 45dd05c..95d613e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -867,7 +867,7 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
 
             acks++;
          }
-         while (ref.getMessage().getMessageID() != messageID);
+         while (ref.getMessageID() != messageID);
 
          if (startedTransaction) {
             tx.commit();

Reply via email to