This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new bf83a9b  ARTEMIS-3608 - Add distribution for Multicast messages to 
OFF_WITH_REDISTRIBUTION to avoid message loss
     new 124cd6c  This closes #3884
bf83a9b is described below

commit bf83a9b3d1c9573bcbed01f6846ebbed8bfb96e6
Author: AntonRoskvist <[email protected]>
AuthorDate: Thu Dec 16 11:00:27 2021 +0100

    ARTEMIS-3608 - Add distribution for Multicast messages to 
OFF_WITH_REDISTRIBUTION to avoid message loss
---
 .../artemis/core/postoffice/impl/BindingsImpl.java |  4 ++-
 .../cluster/distribution/ClusterTestBase.java      | 19 +++++++++++--
 .../distribution/MessageRedistributionTest.java    | 33 ++++++++++++++++++++++
 3 files changed, 53 insertions(+), 3 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index e193296..75fb7b8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.postoffice.Binding;
@@ -408,7 +410,7 @@ public final class BindingsImpl implements Bindings {
    private static boolean matchBinding(final Message message,
                                        final Binding binding,
                                        final MessageLoadBalancingType 
loadBalancingType) {
-      if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) || 
loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) && 
binding instanceof RemoteQueueBinding) {
+      if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) || 
loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION) && 
!Objects.equals(message.getRoutingType(), RoutingType.MULTICAST)) && binding 
instanceof RemoteQueueBinding) {
          return false;
       }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
index 939d916..6c5d176 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
@@ -723,7 +723,7 @@ public abstract class ClusterTestBase extends 
ActiveMQTestBase {
                               final int msgEnd,
                               final boolean durable,
                               final String filterVal) throws Exception {
-      sendInRange(node, address, msgStart, msgEnd, durable, filterVal, null);
+      sendInRange(node, address, msgStart, msgEnd, durable, filterVal, null, 
null);
    }
 
    protected void sendInRange(final int node,
@@ -732,6 +732,7 @@ public abstract class ClusterTestBase extends 
ActiveMQTestBase {
                               final int msgEnd,
                               final boolean durable,
                               final String filterVal,
+                              final RoutingType routingType,
                               final AtomicInteger duplicateDetectionSeq) 
throws Exception {
       ClientSessionFactory sf = sfs[node];
 
@@ -756,6 +757,10 @@ public abstract class ClusterTestBase extends 
ActiveMQTestBase {
                message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, 
new SimpleString(str));
             }
 
+            if (routingType != null) {
+               message.setRoutingType(routingType);
+            }
+
             message.putIntProperty(ClusterTestBase.COUNT_PROP, i);
 
             if (isLargeMessage()) {
@@ -853,7 +858,17 @@ public abstract class ClusterTestBase extends 
ActiveMQTestBase {
                        final boolean durable,
                        final String filterVal,
                        final AtomicInteger duplicateDetectionCounter) throws 
Exception {
-      sendInRange(node, address, 0, numMessages, durable, filterVal, 
duplicateDetectionCounter);
+      send(node, address, numMessages, durable, filterVal, null, 
duplicateDetectionCounter);
+   }
+
+   protected void send(final int node,
+                       final String address,
+                       final int numMessages,
+                       final boolean durable,
+                       final String filterVal,
+                       final RoutingType routingType,
+                       final AtomicInteger duplicateDetectionCounter) throws 
Exception {
+      sendInRange(node, address, 0, numMessages, durable, filterVal, 
routingType, duplicateDetectionCounter);
    }
 
    protected void verifyReceiveAllInRange(final boolean ack,
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
index 1ee78bb..360f1a0 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
@@ -832,6 +832,39 @@ public class MessageRedistributionTest extends 
ClusterTestBase {
    }
 
    @Test
+   public void 
testRedistributionToRemoteMulticastConsumerLbOffWithRedistribution() throws 
Exception {
+
+      String address = "test.address";
+      String queue = "queue";
+      String clusterAddress = "test";
+      AddressSettings settings = new 
AddressSettings().setRedistributionDelay(0).setAutoCreateAddresses(true).setAutoCreateQueues(true);
+      RoutingType routingType = RoutingType.MULTICAST;
+
+      getServer(0).getAddressSettingsRepository().addMatch(address, settings);
+      getServer(1).getAddressSettingsRepository().addMatch(address, settings);
+
+      setupClusterConnection("cluster0", clusterAddress, 
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster0", clusterAddress, 
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 1, 0);
+
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, address, queue, null, false, routingType);
+      addConsumer(0, 0, queue, null);
+      waitForBindings(0, address, 1, 1, true);
+      waitForBindings(1, address, 1, 1, false);
+
+      createAddressInfo(1, address, routingType, 0, false);
+
+      final int noMessages = 10;
+      send(1, address, noMessages, false, null, routingType, null);
+      verifyReceiveAll(noMessages, 0);
+
+   }
+
+   @Test
    public void testBackAndForth() throws Exception {
       for (int i = 0; i < 10; i++) {
          setupCluster(MessageLoadBalancingType.ON_DEMAND);

Reply via email to