This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch 2.19.x in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit f23aadf823c1412a8479df32bd9715edf8ecac4e Author: franz1981 <[email protected]> AuthorDate: Tue Nov 16 10:58:56 2021 +0100 ARTEMIS-3577 Save Core msg re-encoding due to msg copy (cherry picked from commit 185236f74d10300028462ca62b70d7d489665d91) --- .../artemis/core/server/impl/QueueImpl.java | 29 +++++++++++++--------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index e572c90..99157b1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2649,7 +2649,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() { @Override public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { - return moveBetweenSnFQueues(queueSuffix, tx, ref); + return moveBetweenSnFQueues(queueSuffix, tx, ref, null); } }); } @@ -3387,9 +3387,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { final boolean expiry, final boolean rejectDuplicate, final long... queueIDs) throws Exception { - Message copyMessage = makeCopy(ref, expiry); - - copyMessage.setAddress(toAddress); + Message copyMessage = makeCopy(ref, expiry, toAddress); Object originalRoutingType = ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE); if (originalRoutingType != null && originalRoutingType instanceof Byte) { @@ -3417,8 +3415,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @SuppressWarnings({"ArrayToString", "ArrayToStringConcatenation"}) private boolean moveBetweenSnFQueues(final SimpleString queueSuffix, final Transaction tx, - final MessageReference ref) throws Exception { - Message copyMessage = makeCopy(ref, false, false); + final MessageReference ref, + final SimpleString newAddress) throws Exception { + Message copyMessage = makeCopy(ref, false, false, newAddress); byte[] oldRouteToIDs = null; String targetNodeID; @@ -3521,13 +3520,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return new Pair<>(targetNodeID, targetBinding); } - private Message makeCopy(final MessageReference ref, final boolean expiry) throws Exception { - return makeCopy(ref, expiry, true); + private Message makeCopy(final MessageReference ref, final boolean expiry, final SimpleString newAddress) throws Exception { + return makeCopy(ref, expiry, true, newAddress); } private Message makeCopy(final MessageReference ref, final boolean expiry, - final boolean copyOriginalHeaders) throws Exception { + final boolean copyOriginalHeaders, + final SimpleString newAddress) throws Exception { if (ref == null) { ActiveMQServerLogger.LOGGER.nullRefMessage(); throw new ActiveMQNullRefException("Reference to message is null"); @@ -3547,6 +3547,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { Message copy = message.copy(newID, true); + if (newAddress != null) { + // setting it before checkLargeMessage: + // checkLargeMessage can cause msg encoding and setting it later invalidate it, + // forcing to be re-encoded later + copy.setAddress(newAddress); + } + if (copyOriginalHeaders) { copy.referenceOriginalMessage(message, ref.getQueue().getName().toString()); } @@ -3706,9 +3713,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { tx = new TransactionImpl(storageManager); } - Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED); - - copyMessage.setAddress(address); + Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED, address); RoutingStatus routingStatus = postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);
