This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new e8d92b3bc5 ARTEMIS-4431 Re-encode the AMQP message annotations if hops
are updates
e8d92b3bc5 is described below
commit e8d92b3bc59f12d44ba46a45562f64f6a9fb4ff6
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Sep 15 11:08:13 2023 -0400
ARTEMIS-4431 Re-encode the AMQP message annotations if hops are updates
When updating or adding the hops value the AMQP message needs a re-encode to
carry that value forward when the message is sent to the next broker.
---
.../federation/AMQPFederationAddressConsumer.java | 3 ++
.../connect/AMQPFederationAddressPolicyTest.java | 54 +++++++++++++++++++---
2 files changed, 50 insertions(+), 7 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
index c0e05b8c60..322915ad15 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
@@ -374,6 +374,9 @@ public class AMQPFederationAddressConsumer implements
FederationConsumerInternal
message.setAnnotation(MESSAGE_HOPS_ANNOTATION, numHops.intValue() +
1);
}
+ // Annotations need to be rewritten to carry the change forward.
+ message.reencode();
+
return message;
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
index fc2d925069..178093b6a5 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
@@ -87,6 +87,7 @@ import
org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotation
import
org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher;
import
org.apache.qpid.protonj2.test.driver.matchers.transport.TransferPayloadCompositeMatcher;
import
org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpValueMatcher;
+import org.hamcrest.Matchers;
import org.jgroups.util.UUID;
import org.junit.Test;
import org.slf4j.Logger;
@@ -388,16 +389,55 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
peer.expectFlow().withLinkCredit(1000);
- final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
-
- try (Connection connection = factory.createConnection()) {
- final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
- session.createConsumer(session.createTopic("test"));
+ // Induce demand on the local broker which should then create a
receiver to our remote peer.
+ try (ProtonTestClient receivingPeer = new ProtonTestClient()) {
+ receivingPeer.queueClientSaslAnonymousConnect();
+ receivingPeer.connect("localhost", AMQP_PORT);
+ receivingPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ receivingPeer.expectOpen();
+ receivingPeer.expectBegin();
+ receivingPeer.expectAttach();
+ receivingPeer.remoteOpen().withContainerId("test-sender").now();
+ receivingPeer.remoteBegin().now();
+ receivingPeer.remoteAttach().ofReceiver()
+ .withInitialDeliveryCount(0)
+ .withName("sending-peer")
+ .withSource().withAddress("test")
+
.withCapabilities("topic").also()
+ .withTarget().also()
+ .now();
+ receivingPeer.remoteFlow().withLinkCredit(10).now();
+ receivingPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
- connection.start();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ // Check that annotation for hops is present in the forwarded
message.
+ final HeaderMatcher headerMatcher = new HeaderMatcher(true);
+ final MessageAnnotationsMatcher annotationsMatcher = new
MessageAnnotationsMatcher(true);
+ annotationsMatcher.withEntry("x-opt-test",
Matchers.equalTo("test"));
+ annotationsMatcher.withEntry(MESSAGE_HOPS_ANNOTATION.toString(),
Matchers.equalTo(1));
+ final EncodedAmqpValueMatcher bodyMatcher = new
EncodedAmqpValueMatcher("Hello World");
+ final TransferPayloadCompositeMatcher matcher = new
TransferPayloadCompositeMatcher();
+ matcher.setHeadersMatcher(headerMatcher);
+ matcher.setMessageAnnotationsMatcher(annotationsMatcher);
+ matcher.addMessageContentMatcher(bodyMatcher);
+
+ // Broker should route the federated message to the client and it
should
+ // carry the hops annotation indicating that one hop has occurred.
+ receivingPeer.expectTransfer().withPayload(matcher).accept();
+
+ peer.expectDisposition().withState().accepted();
+ peer.remoteTransfer().withHeader().withDurability(true)
+ .also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "test").also()
+ .withBody().withString("Hello World")
+ .also()
+ .withDeliveryId(1)
+ .now();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
- peer.close();
+
+ receivingPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
}