This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ff8419a4b ARTEMIS-4312 dupes w/redistribution and multicast
     new 5722206e92 This closes #4506
3ff8419a4b is described below

commit 3ff8419a4b0e32bff5b43997d09db9a9acc28586
Author: Justin Bertram <jbert...@apache.org>
AuthorDate: Tue Jun 13 11:28:00 2023 -0500

    ARTEMIS-4312 dupes w/redistribution and multicast
    
    Multiple multicast queues on the same address can lead to duplicate
    messages during redistribution in a cluster.
---
 .../artemis/core/postoffice/impl/BindingsImpl.java |  2 +
 .../cluster/distribution/ClusterTestBase.java      | 13 +++-
 .../distribution/MessageRedistributionTest.java    | 79 ++++++++++++++++++++++
 3 files changed, 93 insertions(+), 1 deletion(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 252ae9b782..5f8ad7e3ce 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -269,7 +269,9 @@ public final class BindingsImpl implements Bindings {
       if (logger.isDebugEnabled()) {
          logger.debug("Message {} being copied as {}", message.getMessageID(), 
copyRedistribute.getMessageID());
       }
+
       copyRedistribute.setAddress(message.getAddress());
+      copyRedistribute.clearInternalProperties();
 
       if (context.getTransaction() == null) {
          context.setTransaction(new TransactionImpl(storageManager));
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
index 0bfe403fc1..90d7a90fbd 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
@@ -630,6 +630,17 @@ public abstract class ClusterTestBase extends 
ActiveMQTestBase {
                               boolean autoCommitAcks,
                               final String user,
                               final String password) throws Exception {
+      addConsumer(consumerID, node, queueName, filterVal, autoCommitAcks, 
user, password, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE);
+   }
+
+   protected void addConsumer(final int consumerID,
+                              final int node,
+                              final String queueName,
+                              final String filterVal,
+                              boolean autoCommitAcks,
+                              final String user,
+                              final String password,
+                              final int ackBatchSize) throws Exception {
       try {
          if (consumers[consumerID] != null) {
             throw new IllegalArgumentException("Already a consumer at " + 
node);
@@ -641,7 +652,7 @@ public abstract class ClusterTestBase extends 
ActiveMQTestBase {
             throw new IllegalArgumentException("No sf at " + node);
          }
 
-         ClientSession session = addClientSession(sf.createSession(user, 
password, false, false, autoCommitAcks, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, 
ActiveMQClient.DEFAULT_ACK_BATCH_SIZE));
+         ClientSession session = addClientSession(sf.createSession(user, 
password, false, false, autoCommitAcks, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, 
ackBatchSize));
 
          String filterString = null;
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
index 151c9d9d0c..3f8c825798 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
@@ -173,6 +173,85 @@ public class MessageRedistributionTest extends 
ClusterTestBase {
       logger.debug("Test done");
    }
 
+   @Test
+   public void testRedistributionWithMultipleQueuesOnTheSameAddress() throws 
Exception {
+      final int MESSAGE_COUNT = 10;
+      final String ADDRESS = "myAddress";
+      final String QUEUE0 = "queue0";
+      final String QUEUE1 = "queue1";
+
+      getServer(0).getConfiguration().addAddressSetting(ADDRESS, new 
AddressSettings().setRedistributionDelay(0));
+      getServer(1).getConfiguration().addAddressSetting(ADDRESS, new 
AddressSettings().setRedistributionDelay(0));
+
+      setupClusterConnection("cluster0", ADDRESS, 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster1", ADDRESS, 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+
+      startServers(0, 1);
+
+      waitForTopology(servers[0], 2);
+      waitForTopology(servers[1], 2);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, ADDRESS, QUEUE0, null, false);
+      createQueue(0, ADDRESS, QUEUE1, null, false);
+
+      createQueue(1, ADDRESS, QUEUE0, null, false);
+      createQueue(1, ADDRESS, QUEUE1, null, false);
+
+      addConsumer(0, 0, QUEUE0, null, true, null, null, 0);
+      addConsumer(1, 1, QUEUE0, null, true, null, null, 0);
+
+      waitForBindings(0, ADDRESS, 2, 1, true);
+      waitForBindings(0, ADDRESS, 2, 1, false);
+
+      waitForBindings(1, ADDRESS, 2, 1, true);
+      waitForBindings(1, ADDRESS, 2, 1, false);
+
+      send(0, ADDRESS, MESSAGE_COUNT, true, null, RoutingType.MULTICAST, null);
+
+      { // make sure all the messages were delivered to the proper queues & 
nodes
+         Wait.assertEquals(5L, () -> 
servers[0].locateQueue(QUEUE0).getMessagesAdded(), 2000, 100);
+         Wait.assertEquals(5L, () -> 
servers[1].locateQueue(QUEUE0).getMessagesAdded(), 2000, 100);
+
+         Wait.assertEquals(10L, () -> 
servers[0].locateQueue(QUEUE1).getMessageCount(), 2000, 100);
+         Wait.assertEquals(0L, () -> 
servers[1].locateQueue(QUEUE1).getMessageCount(), 2000, 100);
+      }
+
+      for (int i = 0; i < MESSAGE_COUNT / 2; i++) {
+         {
+            ClientMessage m = consumers[0].getConsumer().receive(1000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+         {
+            ClientMessage m = consumers[1].getConsumer().receive(1000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+      }
+
+      { // make sure all the messages were consumed propertly
+         Wait.assertEquals(5L, () -> 
servers[0].locateQueue(QUEUE0).getMessagesAcknowledged(), 2000, 100);
+         Wait.assertEquals(0L, () -> 
servers[0].locateQueue(QUEUE0).getMessageCount(), 2000, 100);
+
+         Wait.assertEquals(5L, () -> 
servers[1].locateQueue(QUEUE0).getMessagesAcknowledged(), 2000, 100);
+         Wait.assertEquals(0L, () -> 
servers[1].locateQueue(QUEUE0).getMessageCount(), 2000, 100);
+      }
+
+      // add consumer to force redistribution of messages to node 1
+      addConsumer(2, 1, QUEUE1, null);
+      waitForBindings(1, ADDRESS, 2, 2, true);
+      waitForBindings(0, ADDRESS, 2, 2, false);
+
+      Wait.assertEquals(10L, () -> 
servers[1].locateQueue(QUEUE1).getMessageCount(), 2000, 100);
+      Wait.assertEquals(0L, () -> 
servers[0].locateQueue(QUEUE1).getMessageCount(), 2000, 100);
+
+      // ensure no messages were inadvertently redistributed to the wrong 
queue (i.e. the main point of this test)
+      Wait.assertEquals(0L, () -> 
servers[1].locateQueue(QUEUE0).getMessageCount(), 2000, 100);
+   }
+
    //https://issues.jboss.org/browse/HORNETQ-1057
    @Test
    public void testRedistributionStopsWhenConsumerAdded() throws Exception {

Reply via email to