This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new a723f634a2 ARTEMIS-4718 Diverted messages are not propertly routed on
cluster remote bindings
a723f634a2 is described below
commit a723f634a28938b85326079efb4cdc02550be428
Author: AntonRoskvist
AuthorDate: Mon Apr 8 11:45:12 2024 +0200
ARTEMIS-4718 Diverted messages are not propertly routed on cluster remote
bindings
---
.../artemis/core/server/impl/DivertImpl.java | 4 ++
.../distribution/SimpleSymmetricClusterTest.java | 49 ++
2 files changed, 53 insertions(+)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index c499a28d9d..be411303c8 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -116,6 +116,10 @@ public class DivertImpl implements Divert {
copy.setExpiration(message.getExpiration());
+//This header could be set if the message is redistributed from a
clustered broker.
+//It needs to be removed as it will interfere with upcoming routing
+//copy.removeExtraBytesProperty(Message.HDR_ROUTE_TO_IDS);
+
switch (routingType) {
case ANYCAST:
copy.setRoutingType(RoutingType.ANYCAST);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
index 15c7af3ebf..c650014c62 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
@@ -26,12 +26,15 @@ import
org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.postoffice.Binding;
+import
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import
org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Ignore;
@@ -546,4 +549,50 @@ public class SimpleSymmetricClusterTest extends
ClusterTestBase {
}
+ @Test
+ public void testDivertRedistributedMessage() throws Exception {
+ final String queue = "queue0";
+ final String divertedQueueName = "divertedQueue";
+ final int messageCount = 10;
+
+ setupServer(0, true, isNetty());
+ setupServer(1, true, isNetty());
+ setupClusterConnection("cluster0", "",
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+ setupClusterConnection("cluster0", "",
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+
+ servers[0].getConfiguration().addAddressSetting("#", new
AddressSettings().setRedistributionDelay(0));
+ servers[1].getConfiguration().addAddressSetting("#", new
AddressSettings().setRedistributionDelay(0));
+
+ startServers(0, 1);
+
+ servers[0].deployDivert(new DivertConfiguration()
+ .setName("myDivert")
+ .setAddress(queue)
+
.setRoutingType(ComponentConfigurationRoutingType.ANYCAST)
+ .setForwardingAddress(divertedQueueName)
+ .setExclusive(true));
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, queue, queue, null, true, RoutingType.ANYCAST);
+ createQueue(1, queue, queue, null, true, RoutingType.ANYCAST);
+ createQueue(0, divertedQueueName, divertedQueueName, null, true,
RoutingType.ANYCAST);
+ createQueue(1,