clebertsuconic commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1173271714


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MirrorOption;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPRedistributeClusterTest extends AmqpTestSupport {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String QUEUE_NAME = "REDIST_QUEUE";
+   private static final String TOPIC_NAME = "REDIST_TOPIC";
+   private static final SimpleString TOPIC_NAME_SIMPLE_STRING = 
SimpleString.toSimpleString("REDIST_TOPIC");
+
+   protected static final int A_1_PORT = 5673;
+   protected static final int A_2_PORT = 5674;
+
+   ActiveMQServer a1;
+   ActiveMQServer a2;
+
+   protected static final int B_1_PORT = 5773;
+   protected static final int B_2_PORT = 5774;
+
+   ActiveMQServer b1;
+   ActiveMQServer b2;
+
+   @Before
+   public void setCluster() throws Exception {
+      a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
+      a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
+
+      a1.start();
+      a2.start();
+
+      b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
+      b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
+
+      b1.start();
+      b2.start();
+   }
+
+   private ActiveMQServer createClusteredServer(String name, int thisPort, int 
clusterPort, int mirrorPort) throws Exception {
+      ActiveMQServer server = createServer(thisPort, false);
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new
 
QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
+      server.getConfiguration().addAddressConfiguration(new 
CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new 
AddressSettings().setRedistributionDelay(0));
+
+
+      server.setIdentity(name);
+      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new 
LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", 
"tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", 
"tcp://localhost:" + clusterPort);
+
+      ClusterConnectionConfiguration clusterConfiguration = new 
ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
+      server.getConfiguration().addClusterConfiguration(clusterConfiguration);
+
+      if (mirrorPort > 0) {
+         server.getConfiguration().addAMQPConnection(new 
AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + 
mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new
 AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new 
SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
+      }
+
+      return server;
+   }
+
+   @Test
+   public void testQueueRedistributionAMQP() throws Exception {
+      internalQueueRedistribution("AMQP");
+   }
+
+   @Test
+   public void testQueueRedistributionCORE() throws Exception {
+      internalQueueRedistribution("CORE");
+   }
+
+   public void internalQueueRedistribution(String protocol) throws Exception {
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < 100; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = 
sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            } else {
+               place = "A2";
+               consumer = 
sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      assertEmptyQueue(a1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(a2.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b2.locateQueue(QUEUE_NAME));
+
+      // if you see this message, most likely the notifications are being 
copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+   }
+
+   @Test
+   public void testTopicRedistributionAMQP() throws Exception {
+      internalTopicRedistribution("AMQP");
+   }
+
+   @Test
+   public void testTopicRedistributionCORE() throws Exception {
+      internalTopicRedistribution("CORE");
+   }
+
+   public void internalTopicRedistribution(String protocol) throws Exception {
+
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      final int numMessages = 100;
+
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, 
subscriptionName);
+         consumer.close();
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, 
subscriptionName);
+         consumer.close();
+      }
+
+      Wait.assertTrue(() -> 
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> 
b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+
+      // naming convention is different between the protocols, I'm navigating 
through the bindings to find the actual queue name
+      String subscriptionQueueName;
+
+      {
+         HashSet<String> subscriptionSet = new HashSet<>();
+         // making sure the queues created on a1 are propaged into b1
+         
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, 
b) -> {
+            logger.debug("{} = {}", n, b);
+            if (b instanceof LocalQueueBinding) {
+               QueueBinding qb = (QueueBinding) b;
+               subscriptionSet.add(qb.getUniqueName().toString());
+               Wait.assertTrue(() -> b1.locateQueue(qb.getUniqueName()) != 
null);
+            }
+         });
+         Assert.assertEquals(1, subscriptionSet.size());
+         subscriptionQueueName = subscriptionSet.iterator().next();
+      }
+
+      // making sure the queues created on a2 are propaged into b2
+      
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, 
b) -> {
+         logger.debug("{} = {}", n, b);
+         if (b instanceof LocalQueueBinding) {
+            QueueBinding qb = (QueueBinding) b;
+            Wait.assertTrue(() -> b2.locateQueue(qb.getUniqueName()) != null);
+         }
+      });
+
+      Queue a1TopicSubscription = a1.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(a1TopicSubscription);
+      Queue a2TopicSubscription = a2.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(a2TopicSubscription);
+      Queue b1TopicSubscription = b1.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(b1TopicSubscription);
+      Queue b2TopicSubscription = b2.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(a2);
+
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(topic);
+         for (int i = 0; i < numMessages; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+
+      Assert.assertEquals(0, a1TopicSubscription.getConsumerCount());
+      Wait.assertEquals(numMessages / 2, a1TopicSubscription::getMessageCount);
+      Wait.assertEquals(numMessages / 2, a2TopicSubscription::getMessageCount);
+
+      logger.debug("b1={}. b2={}", b1TopicSubscription.getMessageCount(), 
b2TopicSubscription.getMessageCount());
+
+      Wait.assertEquals(numMessages / 2, b1TopicSubscription::getMessageCount);
+      Wait.assertEquals(numMessages / 2, b2TopicSubscription::getMessageCount);
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < numMessages; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = sessionA1.createSharedDurableConsumer(topic, 
subscriptionName);
+            } else {
+               place = "A2";
+               consumer = sessionA2.createSharedDurableConsumer(topic, 
subscriptionName);
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      // if you see this message, most likely the notifications are being 
copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+
+      assertEmptyQueue(a1TopicSubscription);
+      assertEmptyQueue(a2TopicSubscription);
+      assertEmptyQueue(b1TopicSubscription);
+      assertEmptyQueue(b2TopicSubscription);
+   }
+
+   // This test is playing with Remote binding routing, similarly to how topic 
redistribution would happen
+   @Test
+   public void testRemoteBindingRouting() throws Exception {
+      final String protocol = "AMQP";
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" 
+ i);
+         }
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" 
+ i);
+         }
+      }
+
+      Wait.assertTrue(() -> 
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+
+      List<RemoteQueueBinding> remoteQueueBindings_a2 = new ArrayList<>();
+      // making sure the queues created on a2 are propaged into b2
+      
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((a, 
b) -> {
+         if (b instanceof RemoteQueueBindingImpl && 
b.getClusterName().toString().startsWith(subscriptionName + "_0")) {
+            logger.debug("{} = {}", a, b);
+            remoteQueueBindings_a2.add((RemoteQueueBinding) b);
+         }
+      });
+
+      Assert.assertEquals(1, remoteQueueBindings_a2.size());
+
+      RoutingContext routingContext = new RoutingContextImpl(new 
TransactionImpl(a2.getStorageManager())).setMirrorOption(MirrorOption.individualRoute);
+
+      Message directMessage = new 
CoreMessage(a2.getStorageManager().generateID(), 512);
+      directMessage.setAddress(TOPIC_NAME);
+      directMessage.putStringProperty("Test", "t1");
+      remoteQueueBindings_a2.get(0).route(directMessage, routingContext);
+      a2.getPostOffice().processRoute(directMessage, routingContext, false);
+      routingContext.getTransaction().commit();
+
+      for (int i = 0; i < 10; i++) {
+         String name = "my-topic-shared-subscription_" + i + ":global";
+         Wait.assertEquals(i == 0 ? 1 : 0, 
a1.locateQueue(name)::getMessageCount);
+         logger.debug("a1 queue {} with {} messages", name, 
a1.locateQueue(name).getMessageCount());
+         logger.debug("b1 queue {} with {} messages", name, 
b1.locateQueue(name).getMessageCount());
+         logger.debug("a2 queue {} with {} messages", name, 
a2.locateQueue(name).getMessageCount());
+         logger.debug("b2 queue {} with {} messages", name, 
b2.locateQueue(name).getMessageCount());
+         Wait.assertEquals(i == 0 ? 1 : 0, 
b1.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
+      }
+   }
+
+   // This test has distinct subscription on each node and it is making sure 
the Mirror Routing is working accurately
+   @Test
+   public void testMultiNodeSubscription() throws Exception {
+      final String protocol = "AMQP";
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" 
+ i);
+         }
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 10; i < 20; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" 
+ i);
+         }
+      }
+
+      Wait.assertTrue(() -> 
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageProducer producer = session.createProducer(topic);
+         producer.send(session.createTextMessage("hello"));
+      }
+
+      Thread.sleep(5000);

Review Comment:
   oops... removing this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to