This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.19.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit f23aadf823c1412a8479df32bd9715edf8ecac4e
Author: franz1981 <[email protected]>
AuthorDate: Tue Nov 16 10:58:56 2021 +0100

    ARTEMIS-3577 Save Core msg re-encoding due to msg copy
    
    (cherry picked from commit 185236f74d10300028462ca62b70d7d489665d91)
---
 .../artemis/core/server/impl/QueueImpl.java        | 29 +++++++++++++---------
 1 file changed, 17 insertions(+), 12 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index e572c90..99157b1 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2649,7 +2649,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
-            return moveBetweenSnFQueues(queueSuffix, tx, ref);
+            return moveBetweenSnFQueues(queueSuffix, tx, ref, null);
          }
       });
    }
@@ -3387,9 +3387,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                      final boolean expiry,
                      final boolean rejectDuplicate,
                      final long... queueIDs) throws Exception {
-      Message copyMessage = makeCopy(ref, expiry);
-
-      copyMessage.setAddress(toAddress);
+      Message copyMessage = makeCopy(ref, expiry, toAddress);
 
       Object originalRoutingType = 
ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE);
       if (originalRoutingType != null && originalRoutingType instanceof Byte) {
@@ -3417,8 +3415,9 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    @SuppressWarnings({"ArrayToString", "ArrayToStringConcatenation"})
    private boolean moveBetweenSnFQueues(final SimpleString queueSuffix,
                                      final Transaction tx,
-                                     final MessageReference ref) throws 
Exception {
-      Message copyMessage = makeCopy(ref, false, false);
+                                     final MessageReference ref,
+                                     final SimpleString newAddress) throws 
Exception {
+      Message copyMessage = makeCopy(ref, false, false, newAddress);
 
       byte[] oldRouteToIDs = null;
       String targetNodeID;
@@ -3521,13 +3520,14 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       return new Pair<>(targetNodeID, targetBinding);
    }
 
-   private Message makeCopy(final MessageReference ref, final boolean expiry) 
throws Exception {
-      return makeCopy(ref, expiry, true);
+   private Message makeCopy(final MessageReference ref, final boolean expiry, 
final SimpleString newAddress) throws Exception {
+      return makeCopy(ref, expiry, true, newAddress);
    }
 
    private Message makeCopy(final MessageReference ref,
                             final boolean expiry,
-                            final boolean copyOriginalHeaders) throws 
Exception {
+                            final boolean copyOriginalHeaders,
+                            final SimpleString newAddress) throws Exception {
       if (ref == null) {
          ActiveMQServerLogger.LOGGER.nullRefMessage();
          throw new ActiveMQNullRefException("Reference to message is null");
@@ -3547,6 +3547,13 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
       Message copy = message.copy(newID, true);
 
+      if (newAddress != null) {
+         // setting it before checkLargeMessage:
+         // checkLargeMessage can cause msg encoding and setting it later 
invalidate it,
+         // forcing to be re-encoded later
+         copy.setAddress(newAddress);
+      }
+
       if (copyOriginalHeaders) {
          copy.referenceOriginalMessage(message, 
ref.getQueue().getName().toString());
       }
@@ -3706,9 +3713,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          tx = new TransactionImpl(storageManager);
       }
 
-      Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);
-
-      copyMessage.setAddress(address);
+      Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED, 
address);
 
       RoutingStatus routingStatus = postOffice.route(copyMessage, tx, false, 
rejectDuplicate, binding);
 

Reply via email to