This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new a723f634a2 ARTEMIS-4718 Diverted messages are not propertly routed on 
cluster remote bindings
a723f634a2 is described below

commit a723f634a28938b85326079efb4cdc02550be428
Author: AntonRoskvist <anton.roskv...@volvo.com>
AuthorDate: Mon Apr 8 11:45:12 2024 +0200

    ARTEMIS-4718 Diverted messages are not propertly routed on cluster remote 
bindings
---
 .../artemis/core/server/impl/DivertImpl.java       |  4 ++
 .../distribution/SimpleSymmetricClusterTest.java   | 49 ++++++++++++++++++++++
 2 files changed, 53 insertions(+)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index c499a28d9d..be411303c8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -116,6 +116,10 @@ public class DivertImpl implements Divert {
 
             copy.setExpiration(message.getExpiration());
 
+            //This header could be set if the message is redistributed from a 
clustered broker.
+            //It needs to be removed as it will interfere with upcoming routing
+            //copy.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);
+
             switch (routingType) {
                case ANYCAST:
                   copy.setRoutingType(RoutingType.ANYCAST);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
index 15c7af3ebf..c650014c62 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
@@ -26,12 +26,15 @@ import 
org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.postoffice.Binding;
+import 
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -546,4 +549,50 @@ public class SimpleSymmetricClusterTest extends 
ClusterTestBase {
 
    }
 
+   @Test
+   public void testDivertRedistributedMessage() throws Exception {
+      final String queue = "queue0";
+      final String divertedQueueName = "divertedQueue";
+      final int messageCount = 10;
+
+      setupServer(0, true, isNetty());
+      setupServer(1, true, isNetty());
+      setupClusterConnection("cluster0", "", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster0", "", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+
+      servers[0].getConfiguration().addAddressSetting("#", new 
AddressSettings().setRedistributionDelay(0));
+      servers[1].getConfiguration().addAddressSetting("#", new 
AddressSettings().setRedistributionDelay(0));
+
+      startServers(0, 1);
+
+      servers[0].deployDivert(new DivertConfiguration()
+                                 .setName("myDivert")
+                                 .setAddress(queue)
+                                 
.setRoutingType(ComponentConfigurationRoutingType.ANYCAST)
+                                 .setForwardingAddress(divertedQueueName)
+                                 .setExclusive(true));
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, queue, queue, null, true, RoutingType.ANYCAST);
+      createQueue(1, queue, queue, null, true, RoutingType.ANYCAST);
+      createQueue(0, divertedQueueName, divertedQueueName, null, true, 
RoutingType.ANYCAST);
+      createQueue(1, divertedQueueName, divertedQueueName, null, true, 
RoutingType.ANYCAST);
+
+      addConsumer(0, 0, queue, null);
+
+      waitForBindings(0, queue, 1, 1, true);
+      waitForBindings(1, queue, 1, 1, false);
+
+      send(1, queue, messageCount, true, null);
+
+      Wait.assertEquals((long) messageCount, () -> 
servers[0].locateQueue(divertedQueueName).getMessageCount(), 2000, 100);
+
+      addConsumer(1, 1, divertedQueueName, null);
+
+      verifyReceiveAll(messageCount, 1);
+      closeAllConsumers();
+   }
+
 }

Reply via email to