This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new e8d92b3bc5 ARTEMIS-4431 Re-encode the AMQP message annotations if hops 
are updates
e8d92b3bc5 is described below

commit e8d92b3bc59f12d44ba46a45562f64f6a9fb4ff6
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Sep 15 11:08:13 2023 -0400

    ARTEMIS-4431 Re-encode the AMQP message annotations if hops are updates
    
    When updating or adding the hops value the AMQP message needs a re-encode to
    carry that value forward when the message is sent to the next broker.
---
 .../federation/AMQPFederationAddressConsumer.java  |  3 ++
 .../connect/AMQPFederationAddressPolicyTest.java   | 54 +++++++++++++++++++---
 2 files changed, 50 insertions(+), 7 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
index c0e05b8c60..322915ad15 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
@@ -374,6 +374,9 @@ public class AMQPFederationAddressConsumer implements 
FederationConsumerInternal
          message.setAnnotation(MESSAGE_HOPS_ANNOTATION, numHops.intValue() + 
1);
       }
 
+      // Annotations need to be rewritten to carry the change forward.
+      message.reencode();
+
       return message;
    }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
index fc2d925069..178093b6a5 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
@@ -87,6 +87,7 @@ import 
org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotation
 import 
org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher;
 import 
org.apache.qpid.protonj2.test.driver.matchers.transport.TransferPayloadCompositeMatcher;
 import 
org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpValueMatcher;
+import org.hamcrest.Matchers;
 import org.jgroups.util.UUID;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -388,16 +389,55 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
                             
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
          peer.expectFlow().withLinkCredit(1000);
 
-         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
-
-         try (Connection connection = factory.createConnection()) {
-            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
-            session.createConsumer(session.createTopic("test"));
+         // Induce demand on the local broker which should then create a 
receiver to our remote peer.
+         try (ProtonTestClient receivingPeer = new ProtonTestClient()) {
+            receivingPeer.queueClientSaslAnonymousConnect();
+            receivingPeer.connect("localhost", AMQP_PORT);
+            receivingPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            receivingPeer.expectOpen();
+            receivingPeer.expectBegin();
+            receivingPeer.expectAttach();
+            receivingPeer.remoteOpen().withContainerId("test-sender").now();
+            receivingPeer.remoteBegin().now();
+            receivingPeer.remoteAttach().ofReceiver()
+                                        .withInitialDeliveryCount(0)
+                                        .withName("sending-peer")
+                                        .withSource().withAddress("test")
+                                                     
.withCapabilities("topic").also()
+                                        .withTarget().also()
+                                        .now();
+            receivingPeer.remoteFlow().withLinkCredit(10).now();
+            receivingPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
 
-            connection.start();
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
 
+            // Check that annotation for hops is present in the forwarded 
message.
+            final HeaderMatcher headerMatcher = new HeaderMatcher(true);
+            final MessageAnnotationsMatcher annotationsMatcher = new 
MessageAnnotationsMatcher(true);
+            annotationsMatcher.withEntry("x-opt-test", 
Matchers.equalTo("test"));
+            annotationsMatcher.withEntry(MESSAGE_HOPS_ANNOTATION.toString(), 
Matchers.equalTo(1));
+            final EncodedAmqpValueMatcher bodyMatcher = new 
EncodedAmqpValueMatcher("Hello World");
+            final TransferPayloadCompositeMatcher matcher = new 
TransferPayloadCompositeMatcher();
+            matcher.setHeadersMatcher(headerMatcher);
+            matcher.setMessageAnnotationsMatcher(annotationsMatcher);
+            matcher.addMessageContentMatcher(bodyMatcher);
+
+            // Broker should route the federated message to the client and it 
should
+            // carry the hops annotation indicating that one hop has occurred.
+            receivingPeer.expectTransfer().withPayload(matcher).accept();
+
+            peer.expectDisposition().withState().accepted();
+            peer.remoteTransfer().withHeader().withDurability(true)
+                                 .also()
+                                 
.withMessageAnnotations().withAnnotation("x-opt-test", "test").also()
+                                 .withBody().withString("Hello World")
+                                 .also()
+                                 .withDeliveryId(1)
+                                 .now();
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
-            peer.close();
+
+            receivingPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
          }
       }
    }

Reply via email to