gemmellr commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1173948572


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java:
##########
@@ -577,6 +599,9 @@ private String debugBindings() {
    private void routeFromCluster(final Message message,
                                  final RoutingContext context,
                                  final byte[] ids) throws Exception {
+      if (!context.isMirrorDisabled()) {
+         context.setMirrorOption(MirrorOption.individualRoute);
+      }

Review Comment:
   Why? Where is this reset?



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -357,6 +364,13 @@ private static void setProtocolData(MessageReference ref, 
String brokerID, long
             daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress());
          }
       }
+
+      if (routingContext != null && routingContext.isMirrorIndividualRoute()) {
+         ArrayList<String> queues = new ArrayList<>();
+         routingContext.forEachDurable(q -> 
queues.add(String.valueOf(q.getName())));
+         daMap.put(TARGET_QUEUES, queues);
+      }

Review Comment:
   Why does it only look at durable queues?
   
   What if there are none? Should it actually be sending an empty list as it 
seems it will?
   
   (Also, what is a 'MirrorIndividualRoute'?)
   



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -76,14 +77,20 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
    public static final Symbol INTERNAL_ID = 
Symbol.getSymbol("x-opt-amq-mr-id");
    public static final Symbol INTERNAL_DESTINATION = 
Symbol.getSymbol("x-opt-amq-mr-dst");
 
+   /** When a clustered node (from regular cluster connections) receives a 
message
+       it will have target queues associated with it
+      this could be from message redistribution or simply load balancing.
+      an that case this will have the queue associated with it */
+   public static final Symbol TARGET_QUEUES = 
Symbol.getSymbol("x-opt-amq-trg-q");

Review Comment:
   Last sentence of the javadoc (should that be a comment?) is a bit broken.
   
   Is this mirroring specific? Its defined in a mirroring class, and seemingly 
only referenced in others, suggesting so...if so, should it start 
"x-opt-amq-mr-" like all the other mirroring related key names.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MirrorOption.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+/** This is to be used in conjunction with RoutingContext, where we control 
certain semantics during routing.
+ *  */
+public enum MirrorOption {

Review Comment:
   This can never realistically carry any other 'mirror options' so that name 
really doesnt seem to fit well. As the comment says, it just a routing control 
really. RoutingContextMirrorControl?
   



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MirrorOption;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPRedistributeClusterTest extends AmqpTestSupport {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String QUEUE_NAME = "REDIST_QUEUE";
+   private static final String TOPIC_NAME = "REDIST_TOPIC";
+   private static final SimpleString TOPIC_NAME_SIMPLE_STRING = 
SimpleString.toSimpleString("REDIST_TOPIC");
+
+   protected static final int A_1_PORT = 5673;
+   protected static final int A_2_PORT = 5674;
+
+   ActiveMQServer a1;
+   ActiveMQServer a2;
+
+   protected static final int B_1_PORT = 5773;
+   protected static final int B_2_PORT = 5774;
+
+   ActiveMQServer b1;
+   ActiveMQServer b2;
+
+   @Before
+   public void setCluster() throws Exception {
+      a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
+      a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
+
+      a1.start();
+      a2.start();
+
+      b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
+      b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
+
+      b1.start();
+      b2.start();
+   }
+
+   private ActiveMQServer createClusteredServer(String name, int thisPort, int 
clusterPort, int mirrorPort) throws Exception {
+      ActiveMQServer server = createServer(thisPort, false);
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new
 
QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new 
AddressSettings().setRedistributionDelay(0));
+
+
+      server.setIdentity(name);
+      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new 
LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", 
"tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", 
"tcp://localhost:" + clusterPort);

Review Comment:
   All server names are "node_1"? What is the impact of that?



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MirrorOption;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPRedistributeClusterTest extends AmqpTestSupport {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String QUEUE_NAME = "REDIST_QUEUE";
+   private static final String TOPIC_NAME = "REDIST_TOPIC";
+   private static final SimpleString TOPIC_NAME_SIMPLE_STRING = 
SimpleString.toSimpleString("REDIST_TOPIC");
+
+   protected static final int A_1_PORT = 5673;
+   protected static final int A_2_PORT = 5674;
+
+   ActiveMQServer a1;
+   ActiveMQServer a2;
+
+   protected static final int B_1_PORT = 5773;
+   protected static final int B_2_PORT = 5774;
+
+   ActiveMQServer b1;
+   ActiveMQServer b2;
+
+   @Before
+   public void setCluster() throws Exception {
+      a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
+      a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
+
+      a1.start();
+      a2.start();
+
+      b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
+      b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
+
+      b1.start();
+      b2.start();
+   }
+
+   private ActiveMQServer createClusteredServer(String name, int thisPort, int 
clusterPort, int mirrorPort) throws Exception {
+      ActiveMQServer server = createServer(thisPort, false);
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new
 
QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new 
AddressSettings().setRedistributionDelay(0));
+
+
+      server.setIdentity(name);
+      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new 
LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", 
"tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", 
"tcp://localhost:" + clusterPort);
+
+      ClusterConnectionConfiguration clusterConfiguration = new 
ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
+      server.getConfiguration().addClusterConfiguration(clusterConfiguration);
+
+      if (mirrorPort > 0) {
+         server.getConfiguration().addAMQPConnection(new 
AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + 
mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new
 AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new 
SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
+      }
+
+      return server;
+   }
+
+   @Test
+   public void testQueueRedistributionAMQP() throws Exception {
+      internalQueueRedistribution("AMQP");
+   }
+
+   @Test
+   public void testQueueRedistributionCORE() throws Exception {
+      internalQueueRedistribution("CORE");
+   }
+
+   public void internalQueueRedistribution(String protocol) throws Exception {
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < 100; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = 
sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            } else {
+               place = "A2";
+               consumer = 
sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      assertEmptyQueue(a1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(a2.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b2.locateQueue(QUEUE_NAME));
+
+      // if you see this message, most likely the notifications are being 
copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+   }
+
+   @Test
+   public void testTopicRedistributionAMQP() throws Exception {
+      internalTopicRedistribution("AMQP");
+   }
+
+   @Test
+   public void testTopicRedistributionCORE() throws Exception {
+      internalTopicRedistribution("CORE");
+   }
+
+   public void internalTopicRedistribution(String protocol) throws Exception {
+
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      final int numMessages = 100;
+
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, 
subscriptionName);
+         consumer.close();
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, 
subscriptionName);
+         consumer.close();
+      }
+
+      Wait.assertTrue(() -> 
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+
+      // naming convention is different between the protocols, I'm navigating 
through the bindings to find the actual queue name
+      String subscriptionQueueName;
+
+      {
+         HashSet<String> subscriptionSet = new HashSet<>();
+         // making sure the queues created on a1 are propaged into b1

Review Comment:
   typo,  propagated



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MirrorOption;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPRedistributeClusterTest extends AmqpTestSupport {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String QUEUE_NAME = "REDIST_QUEUE";
+   private static final String TOPIC_NAME = "REDIST_TOPIC";
+   private static final SimpleString TOPIC_NAME_SIMPLE_STRING = 
SimpleString.toSimpleString("REDIST_TOPIC");
+
+   protected static final int A_1_PORT = 5673;
+   protected static final int A_2_PORT = 5674;
+
+   ActiveMQServer a1;
+   ActiveMQServer a2;
+
+   protected static final int B_1_PORT = 5773;
+   protected static final int B_2_PORT = 5774;
+
+   ActiveMQServer b1;
+   ActiveMQServer b2;
+
+   @Before
+   public void setCluster() throws Exception {
+      a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
+      a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
+
+      a1.start();
+      a2.start();
+
+      b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
+      b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
+
+      b1.start();
+      b2.start();
+   }
+
+   private ActiveMQServer createClusteredServer(String name, int thisPort, int 
clusterPort, int mirrorPort) throws Exception {
+      ActiveMQServer server = createServer(thisPort, false);
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new
 
QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new 
AddressSettings().setRedistributionDelay(0));
+
+
+      server.setIdentity(name);
+      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new 
LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", 
"tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", 
"tcp://localhost:" + clusterPort);
+
+      ClusterConnectionConfiguration clusterConfiguration = new 
ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
+      server.getConfiguration().addClusterConfiguration(clusterConfiguration);
+
+      if (mirrorPort > 0) {
+         server.getConfiguration().addAMQPConnection(new 
AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + 
mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new
 AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new 
SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
+      }
+
+      return server;
+   }
+
+   @Test
+   public void testQueueRedistributionAMQP() throws Exception {
+      internalQueueRedistribution("AMQP");
+   }
+
+   @Test
+   public void testQueueRedistributionCORE() throws Exception {
+      internalQueueRedistribution("CORE");
+   }
+
+   public void internalQueueRedistribution(String protocol) throws Exception {
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < 100; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = 
sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            } else {
+               place = "A2";
+               consumer = 
sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      assertEmptyQueue(a1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(a2.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b2.locateQueue(QUEUE_NAME));
+
+      // if you see this message, most likely the notifications are being 
copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+   }
+
+   @Test
+   public void testTopicRedistributionAMQP() throws Exception {
+      internalTopicRedistribution("AMQP");
+   }
+
+   @Test
+   public void testTopicRedistributionCORE() throws Exception {
+      internalTopicRedistribution("CORE");
+   }
+
+   public void internalTopicRedistribution(String protocol) throws Exception {
+
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      final int numMessages = 100;
+
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, 
subscriptionName);
+         consumer.close();
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, 
subscriptionName);
+         consumer.close();
+      }
+
+      Wait.assertTrue(() -> 
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+
+      // naming convention is different between the protocols, I'm navigating 
through the bindings to find the actual queue name
+      String subscriptionQueueName;
+
+      {
+         HashSet<String> subscriptionSet = new HashSet<>();
+         // making sure the queues created on a1 are propaged into b1
+         
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, 
b) -> {
+            logger.debug("{} = {}", n, b);
+            if (b instanceof LocalQueueBinding) {
+               QueueBinding qb = (QueueBinding) b;
+               subscriptionSet.add(qb.getUniqueName().toString());
+               Wait.assertTrue(() -> b1.locateQueue(qb.getUniqueName()) != 
null);
+            }
+         });
+         Assert.assertEquals(1, subscriptionSet.size());
+         subscriptionQueueName = subscriptionSet.iterator().next();
+      }
+
+      // making sure the queues created on a2 are propaged into b2

Review Comment:
   typo,  propagated



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MirrorOption;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPRedistributeClusterTest extends AmqpTestSupport {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String QUEUE_NAME = "REDIST_QUEUE";
+   private static final String TOPIC_NAME = "REDIST_TOPIC";
+   private static final SimpleString TOPIC_NAME_SIMPLE_STRING = 
SimpleString.toSimpleString("REDIST_TOPIC");
+
+   protected static final int A_1_PORT = 5673;
+   protected static final int A_2_PORT = 5674;
+
+   ActiveMQServer a1;
+   ActiveMQServer a2;
+
+   protected static final int B_1_PORT = 5773;
+   protected static final int B_2_PORT = 5774;
+
+   ActiveMQServer b1;
+   ActiveMQServer b2;
+
+   @Before
+   public void setCluster() throws Exception {
+      a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
+      a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
+
+      a1.start();
+      a2.start();
+
+      b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
+      b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
+
+      b1.start();
+      b2.start();
+   }
+
+   private ActiveMQServer createClusteredServer(String name, int thisPort, int 
clusterPort, int mirrorPort) throws Exception {
+      ActiveMQServer server = createServer(thisPort, false);
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new
 
QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new 
AddressSettings().setRedistributionDelay(0));
+
+
+      server.setIdentity(name);
+      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new 
LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", 
"tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", 
"tcp://localhost:" + clusterPort);
+
+      ClusterConnectionConfiguration clusterConfiguration = new 
ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
+      server.getConfiguration().addClusterConfiguration(clusterConfiguration);
+
+      if (mirrorPort > 0) {
+         server.getConfiguration().addAMQPConnection(new 
AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + 
mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new
 AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new 
SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
+      }
+
+      return server;
+   }
+
+   @Test
+   public void testQueueRedistributionAMQP() throws Exception {
+      internalQueueRedistribution("AMQP");
+   }
+
+   @Test
+   public void testQueueRedistributionCORE() throws Exception {
+      internalQueueRedistribution("CORE");
+   }
+
+   public void internalQueueRedistribution(String protocol) throws Exception {
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < 100; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = 
sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            } else {
+               place = "A2";
+               consumer = 
sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      assertEmptyQueue(a1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(a2.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b2.locateQueue(QUEUE_NAME));
+
+      // if you see this message, most likely the notifications are being 
copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+   }
+
+   @Test
+   public void testTopicRedistributionAMQP() throws Exception {
+      internalTopicRedistribution("AMQP");
+   }
+
+   @Test
+   public void testTopicRedistributionCORE() throws Exception {
+      internalTopicRedistribution("CORE");
+   }
+
+   public void internalTopicRedistribution(String protocol) throws Exception {
+
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      final int numMessages = 100;
+
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, 
subscriptionName);
+         consumer.close();
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, 
subscriptionName);
+         consumer.close();
+      }
+
+      Wait.assertTrue(() -> 
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+
+      // naming convention is different between the protocols, I'm navigating 
through the bindings to find the actual queue name
+      String subscriptionQueueName;
+
+      {
+         HashSet<String> subscriptionSet = new HashSet<>();
+         // making sure the queues created on a1 are propaged into b1
+         
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, 
b) -> {
+            logger.debug("{} = {}", n, b);
+            if (b instanceof LocalQueueBinding) {
+               QueueBinding qb = (QueueBinding) b;
+               subscriptionSet.add(qb.getUniqueName().toString());
+               Wait.assertTrue(() -> b1.locateQueue(qb.getUniqueName()) != 
null);
+            }
+         });
+         Assert.assertEquals(1, subscriptionSet.size());
+         subscriptionQueueName = subscriptionSet.iterator().next();
+      }
+
+      // making sure the queues created on a2 are propaged into b2
+      
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, 
b) -> {
+         logger.debug("{} = {}", n, b);
+         if (b instanceof LocalQueueBinding) {
+            QueueBinding qb = (QueueBinding) b;
+            Wait.assertTrue(() -> b2.locateQueue(qb.getUniqueName()) != null);
+         }
+      });

Review Comment:
   This should verify+assert like the previous check above does that the 
forEach actually did something.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MirrorOption;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPRedistributeClusterTest extends AmqpTestSupport {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String QUEUE_NAME = "REDIST_QUEUE";
+   private static final String TOPIC_NAME = "REDIST_TOPIC";
+   private static final SimpleString TOPIC_NAME_SIMPLE_STRING = 
SimpleString.toSimpleString("REDIST_TOPIC");
+
+   protected static final int A_1_PORT = 5673;
+   protected static final int A_2_PORT = 5674;
+
+   ActiveMQServer a1;
+   ActiveMQServer a2;
+
+   protected static final int B_1_PORT = 5773;
+   protected static final int B_2_PORT = 5774;
+
+   ActiveMQServer b1;
+   ActiveMQServer b2;
+
+   @Before
+   public void setCluster() throws Exception {
+      a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
+      a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
+
+      a1.start();
+      a2.start();
+
+      b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
+      b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
+
+      b1.start();
+      b2.start();
+   }
+
+   private ActiveMQServer createClusteredServer(String name, int thisPort, int 
clusterPort, int mirrorPort) throws Exception {
+      ActiveMQServer server = createServer(thisPort, false);
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new
 
QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new 
AddressSettings().setRedistributionDelay(0));
+
+
+      server.setIdentity(name);
+      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new 
LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", 
"tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", 
"tcp://localhost:" + clusterPort);
+
+      ClusterConnectionConfiguration clusterConfiguration = new 
ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
+      server.getConfiguration().addClusterConfiguration(clusterConfiguration);
+
+      if (mirrorPort > 0) {
+         server.getConfiguration().addAMQPConnection(new 
AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + 
mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new
 AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new 
SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
+      }
+
+      return server;
+   }
+
+   @Test
+   public void testQueueRedistributionAMQP() throws Exception {
+      internalQueueRedistribution("AMQP");
+   }
+
+   @Test
+   public void testQueueRedistributionCORE() throws Exception {
+      internalQueueRedistribution("CORE");
+   }
+
+   public void internalQueueRedistribution(String protocol) throws Exception {
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < 100; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = 
sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            } else {
+               place = "A2";
+               consumer = 
sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      assertEmptyQueue(a1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(a2.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b2.locateQueue(QUEUE_NAME));
+
+      // if you see this message, most likely the notifications are being 
copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+   }
+
+   @Test
+   public void testTopicRedistributionAMQP() throws Exception {
+      internalTopicRedistribution("AMQP");
+   }
+
+   @Test
+   public void testTopicRedistributionCORE() throws Exception {
+      internalTopicRedistribution("CORE");
+   }
+
+   public void internalTopicRedistribution(String protocol) throws Exception {
+
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      final int numMessages = 100;
+
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, 
subscriptionName);
+         consumer.close();
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, 
subscriptionName);
+         consumer.close();
+      }
+
+      Wait.assertTrue(() -> 
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+
+      // naming convention is different between the protocols, I'm navigating 
through the bindings to find the actual queue name
+      String subscriptionQueueName;
+
+      {
+         HashSet<String> subscriptionSet = new HashSet<>();
+         // making sure the queues created on a1 are propaged into b1
+         
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, 
b) -> {
+            logger.debug("{} = {}", n, b);
+            if (b instanceof LocalQueueBinding) {
+               QueueBinding qb = (QueueBinding) b;
+               subscriptionSet.add(qb.getUniqueName().toString());
+               Wait.assertTrue(() -> b1.locateQueue(qb.getUniqueName()) != 
null);
+            }
+         });
+         Assert.assertEquals(1, subscriptionSet.size());
+         subscriptionQueueName = subscriptionSet.iterator().next();
+      }
+
+      // making sure the queues created on a2 are propaged into b2
+      
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, 
b) -> {
+         logger.debug("{} = {}", n, b);
+         if (b instanceof LocalQueueBinding) {
+            QueueBinding qb = (QueueBinding) b;
+            Wait.assertTrue(() -> b2.locateQueue(qb.getUniqueName()) != null);
+         }
+      });
+
+      Queue a1TopicSubscription = a1.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(a1TopicSubscription);
+      Queue a2TopicSubscription = a2.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(a2TopicSubscription);
+      Queue b1TopicSubscription = b1.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(b1TopicSubscription);
+      Queue b2TopicSubscription = b2.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(a2);
+
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(topic);
+         for (int i = 0; i < numMessages; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+
+      Assert.assertEquals(0, a1TopicSubscription.getConsumerCount());
+      Wait.assertEquals(numMessages / 2, a1TopicSubscription::getMessageCount);
+      Wait.assertEquals(numMessages / 2, a2TopicSubscription::getMessageCount);
+
+      logger.debug("b1={}. b2={}", b1TopicSubscription.getMessageCount(), 
b2TopicSubscription.getMessageCount());
+
+      Wait.assertEquals(numMessages / 2, b1TopicSubscription::getMessageCount);
+      Wait.assertEquals(numMessages / 2, b2TopicSubscription::getMessageCount);
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < numMessages; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = sessionA1.createSharedDurableConsumer(topic, 
subscriptionName);
+            } else {
+               place = "A2";
+               consumer = sessionA2.createSharedDurableConsumer(topic, 
subscriptionName);
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      // if you see this message, most likely the notifications are being 
copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+
+      assertEmptyQueue(a1TopicSubscription);
+      assertEmptyQueue(a2TopicSubscription);
+      assertEmptyQueue(b1TopicSubscription);
+      assertEmptyQueue(b2TopicSubscription);
+   }
+
+   // This test is playing with Remote binding routing, similarly to how topic 
redistribution would happen
+   @Test
+   public void testRemoteBindingRouting() throws Exception {
+      final String protocol = "AMQP";
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" 
+ i);
+         }
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" 
+ i);
+         }
+      }
+
+      Wait.assertTrue(() -> 
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+
+      List<RemoteQueueBinding> remoteQueueBindings_a2 = new ArrayList<>();
+      // making sure the queues created on a2 are propaged into b2
+      
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((a, 
b) -> {
+         if (b instanceof RemoteQueueBindingImpl && 
b.getClusterName().toString().startsWith(subscriptionName + "_0")) {
+            logger.debug("{} = {}", a, b);
+            remoteQueueBindings_a2.add((RemoteQueueBinding) b);
+         }
+      });
+
+      Assert.assertEquals(1, remoteQueueBindings_a2.size());
+
+      RoutingContext routingContext = new RoutingContextImpl(new 
TransactionImpl(a2.getStorageManager())).setMirrorOption(MirrorOption.individualRoute);
+
+      Message directMessage = new 
CoreMessage(a2.getStorageManager().generateID(), 512);
+      directMessage.setAddress(TOPIC_NAME);
+      directMessage.putStringProperty("Test", "t1");
+      remoteQueueBindings_a2.get(0).route(directMessage, routingContext);
+      a2.getPostOffice().processRoute(directMessage, routingContext, false);
+      routingContext.getTransaction().commit();
+
+      for (int i = 0; i < 10; i++) {
+         String name = "my-topic-shared-subscription_" + i + ":global";
+         Wait.assertEquals(i == 0 ? 1 : 0, 
a1.locateQueue(name)::getMessageCount);
+         logger.debug("a1 queue {} with {} messages", name, 
a1.locateQueue(name).getMessageCount());
+         logger.debug("b1 queue {} with {} messages", name, 
b1.locateQueue(name).getMessageCount());
+         logger.debug("a2 queue {} with {} messages", name, 
a2.locateQueue(name).getMessageCount());
+         logger.debug("b2 queue {} with {} messages", name, 
b2.locateQueue(name).getMessageCount());
+         Wait.assertEquals(i == 0 ? 1 : 0, 
b1.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
+      }

Review Comment:
   A comment explaining what this bit is doing might be in order. Took me a 
while to notice the 'individualRoute' option, and 'get(0).route' 
tricks........until then, reading the test it just looked like only 1 of 10 
subscriptions got the message when all subscriptions would be expected to get 
it get it. Finally on closer inspection those tricks came out. Later, without 
the context of the wider commit, it will be harder to understand than it is now.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -707,6 +708,10 @@ public QueueImpl(final QueueConfiguration 
queueConfiguration,
 
       this.server = server;
 
+      if (queueConfiguration.isInternal()) {
+         this.internalQueue = queueConfiguration.isInternal();
+      }

Review Comment:
   The if is superfluous, just set the value.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java:
##########
@@ -40,9 +41,15 @@ public interface RoutingContext {
 
    /** If the routing is from MirrorController, we don't redo mirrorController
     *  to avoid*/
+   MirrorOption getMirrorOption();

Review Comment:
   Existing comment above it is inaccurate now.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -76,14 +77,20 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
    public static final Symbol INTERNAL_ID = 
Symbol.getSymbol("x-opt-amq-mr-id");
    public static final Symbol INTERNAL_DESTINATION = 
Symbol.getSymbol("x-opt-amq-mr-dst");
 
+   /** When a clustered node (from regular cluster connections) receives a 
message
+       it will have target queues associated with it
+      this could be from message redistribution or simply load balancing.
+      an that case this will have the queue associated with it */
+   public static final Symbol TARGET_QUEUES = 
Symbol.getSymbol("x-opt-amq-trg-q");
+
    // Capabilities
    public static final Symbol MIRROR_CAPABILITY = 
Symbol.getSymbol("amq.mirror");
    public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = 
Symbol.valueOf("qd.waypoint");
 
    public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = 
SimpleString.toSimpleString(INTERNAL_ID.toString());
    public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = 
SimpleString.toSimpleString(BROKER_ID.toString());
 
-   private static final ThreadLocal<RoutingContext> mirrorControlRouting = 
ThreadLocal.withInitial(() -> new 
RoutingContextImpl(null).setMirrorDisabled(true));
+   private static final ThreadLocal<RoutingContext> mirrorControlRouting = 
ThreadLocal.withInitial(() -> new 
RoutingContextImpl(null).setMirrorOption(MirrorOption.disabled));

Review Comment:
   o.a.a.a.core.server.MirrorOption seems like an odd name for this, when its 
not really a general option of the Mirror, but a specific behaviour control for 
the RoutingContext. A more routing-specific name would seem appropriate.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MirrorOption;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPRedistributeClusterTest extends AmqpTestSupport {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String QUEUE_NAME = "REDIST_QUEUE";
+   private static final String TOPIC_NAME = "REDIST_TOPIC";
+   private static final SimpleString TOPIC_NAME_SIMPLE_STRING = 
SimpleString.toSimpleString("REDIST_TOPIC");
+
+   protected static final int A_1_PORT = 5673;
+   protected static final int A_2_PORT = 5674;
+
+   ActiveMQServer a1;
+   ActiveMQServer a2;
+
+   protected static final int B_1_PORT = 5773;
+   protected static final int B_2_PORT = 5774;
+
+   ActiveMQServer b1;
+   ActiveMQServer b2;
+
+   @Before
+   public void setCluster() throws Exception {
+      a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
+      a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
+
+      a1.start();
+      a2.start();
+
+      b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
+      b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
+
+      b1.start();
+      b2.start();
+   }
+
+   private ActiveMQServer createClusteredServer(String name, int thisPort, int 
clusterPort, int mirrorPort) throws Exception {
+      ActiveMQServer server = createServer(thisPort, false);
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new
 
QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new 
AddressSettings().setRedistributionDelay(0));
+
+
+      server.setIdentity(name);
+      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new 
LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", 
"tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", 
"tcp://localhost:" + clusterPort);
+
+      ClusterConnectionConfiguration clusterConfiguration = new 
ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
+      server.getConfiguration().addClusterConfiguration(clusterConfiguration);
+
+      if (mirrorPort > 0) {
+         server.getConfiguration().addAMQPConnection(new 
AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + 
mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new
 AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new 
SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
+      }
+
+      return server;
+   }
+
+   @Test
+   public void testQueueRedistributionAMQP() throws Exception {
+      internalQueueRedistribution("AMQP");
+   }
+
+   @Test
+   public void testQueueRedistributionCORE() throws Exception {
+      internalQueueRedistribution("CORE");
+   }
+
+   public void internalQueueRedistribution(String protocol) throws Exception {
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < 100; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = 
sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            } else {
+               place = "A2";
+               consumer = 
sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));

Review Comment:
   If actually creating distinct destination objects rather than reusing, 
should probably use the matching session for clarity (it mixes sessionA2 and 
sessionA1)



##########
tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java:
##########
@@ -464,6 +465,21 @@ public void 
setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBala
 
       }
 
+      @Override
+      public Binding getBinding(String name) {
+         return null;

Review Comment:
   even if its BindingsFake, seems weird not to implement this when the other 
methods are.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to