This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 295f64bfa9 ARTEMIS-5330 use proper address name on retroactive messages
295f64bfa9 is described below
commit 295f64bfa9583c370df240e75bdaf334e7215a59
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Feb 25 11:42:47 2025 -0600
ARTEMIS-5330 use proper address name on retroactive messages
---
.../artemis/core/server/impl/QueueImpl.java | 9 +++++---
.../artemis/tests/integration/mqtt5/MQTT5Test.java | 25 ++++++++++++++++++++++
.../integration/server/RetroactiveAddressTest.java | 1 +
3 files changed, 32 insertions(+), 3 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 4fd88a48fa..71943f4468 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2544,9 +2544,12 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
@Override
public boolean actMessage(Transaction tx, MessageReference ref)
throws Exception {
RoutingContext routingContext = new RoutingContextImpl(tx);
-
routingContext.setAddress(server.locateQueue(queueName).getAddress());
-
server.getPostOffice().getBinding(queueName).route(ref.getMessage(),
routingContext);
- postOffice.processRoute(ref.getMessage(), routingContext, false);
+ SimpleString address = server.locateQueue(queueName).getAddress();
+ routingContext.setAddress(address);
+ Message m = ref.getMessage();
+ m.setAddress(address);
+ server.getPostOffice().getBinding(queueName).route(m,
routingContext);
+ postOffice.processRoute(m, routingContext, false);
return false;
}
});
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 3f80efe054..1a8fd251f1 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -93,6 +93,31 @@ public class MQTT5Test extends MQTT5TestSupport {
assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
}
+ @Test
+ @Timeout(DEFAULT_TIMEOUT_SEC)
+ public void testSimpleRetroSendReceive() throws Exception {
+ final String topic = RandomUtil.randomUUIDString();
+ server.getAddressSettingsRepository().addMatch(topic, new
AddressSettings().setRetroactiveMessageCount(1));
+
+ MqttClient producer = createPahoClient("producer");
+ producer.connect();
+ producer.publish(topic, "myMessage".getBytes(StandardCharsets.UTF_8), 1,
false);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ MqttClient subscriber = createPahoClient("subscriber");
+ subscriber.connect();
+ subscriber.setCallback(new DefaultMqttCallback() {
+ @Override
+ public void messageArrived(String t, MqttMessage message) {
+ logger.info("Message received from topic {}, message={}", t,
message);
+ assertEquals(topic.toString(), t);
+ latch.countDown();
+ }
+ });
+ subscriber.subscribe(topic, AT_LEAST_ONCE);
+ assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
+ }
+
@Test
@Timeout(DEFAULT_TIMEOUT_SEC)
public void testTopicNameEscape() throws Exception {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java
index efb67aedb7..a7abc4024d 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java
@@ -181,6 +181,7 @@ public class RetroactiveAddressTest extends
ActiveMQTestBase {
ClientMessage message = consumer.receive(1000);
assertNotNull(message);
message.acknowledge();
+ assertEquals(addressName.toString(), message.getAddress());
assertEquals((i * COUNT) + j, (int) message.getIntProperty("xxx"));
}
consumer.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact