This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 295f64bfa9 ARTEMIS-5330 use proper address name on retroactive messages
295f64bfa9 is described below

commit 295f64bfa9583c370df240e75bdaf334e7215a59
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Feb 25 11:42:47 2025 -0600

    ARTEMIS-5330 use proper address name on retroactive messages
---
 .../artemis/core/server/impl/QueueImpl.java        |  9 +++++---
 .../artemis/tests/integration/mqtt5/MQTT5Test.java | 25 ++++++++++++++++++++++
 .../integration/server/RetroactiveAddressTest.java |  1 +
 3 files changed, 32 insertions(+), 3 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 4fd88a48fa..71943f4468 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2544,9 +2544,12 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
             RoutingContext routingContext = new RoutingContextImpl(tx);
-            
routingContext.setAddress(server.locateQueue(queueName).getAddress());
-            
server.getPostOffice().getBinding(queueName).route(ref.getMessage(), 
routingContext);
-            postOffice.processRoute(ref.getMessage(), routingContext, false);
+            SimpleString address = server.locateQueue(queueName).getAddress();
+            routingContext.setAddress(address);
+            Message m = ref.getMessage();
+            m.setAddress(address);
+            server.getPostOffice().getBinding(queueName).route(m, 
routingContext);
+            postOffice.processRoute(m, routingContext, false);
             return false;
          }
       });
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 3f80efe054..1a8fd251f1 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -93,6 +93,31 @@ public class MQTT5Test extends MQTT5TestSupport {
       assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
    }
 
+   @Test
+   @Timeout(DEFAULT_TIMEOUT_SEC)
+   public void testSimpleRetroSendReceive() throws Exception {
+      final String topic = RandomUtil.randomUUIDString();
+      server.getAddressSettingsRepository().addMatch(topic, new 
AddressSettings().setRetroactiveMessageCount(1));
+
+      MqttClient producer = createPahoClient("producer");
+      producer.connect();
+      producer.publish(topic, "myMessage".getBytes(StandardCharsets.UTF_8), 1, 
false);
+
+      CountDownLatch latch = new CountDownLatch(1);
+      MqttClient subscriber = createPahoClient("subscriber");
+      subscriber.connect();
+      subscriber.setCallback(new DefaultMqttCallback() {
+         @Override
+         public void messageArrived(String t, MqttMessage message) {
+            logger.info("Message received from topic {}, message={}", t, 
message);
+            assertEquals(topic.toString(), t);
+            latch.countDown();
+         }
+      });
+      subscriber.subscribe(topic, AT_LEAST_ONCE);
+      assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
+   }
+
    @Test
    @Timeout(DEFAULT_TIMEOUT_SEC)
    public void testTopicNameEscape() throws Exception {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java
index efb67aedb7..a7abc4024d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RetroactiveAddressTest.java
@@ -181,6 +181,7 @@ public class RetroactiveAddressTest extends 
ActiveMQTestBase {
             ClientMessage message = consumer.receive(1000);
             assertNotNull(message);
             message.acknowledge();
+            assertEquals(addressName.toString(), message.getAddress());
             assertEquals((i * COUNT) + j, (int) message.getIntProperty("xxx"));
          }
          consumer.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to