gemmellr commented on code in PR #4445:
URL: https://github.com/apache/activemq-artemis/pull/4445#discussion_r1176379290


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -356,28 +366,140 @@ public void testRemoteBindingRouting() throws Exception {
 
       Assert.assertEquals(1, remoteQueueBindings_a2.size());
 
-      RoutingContext routingContext = new RoutingContextImpl(new 
TransactionImpl(a2.getStorageManager())).setMirrorOption(MirrorOption.individualRoute);
+      RoutingContext routingContext = new RoutingContextImpl(new 
TransactionImpl(a2.getStorageManager())).setMirrorOption(RoutingContext.MirrorOption.individualRoute);
 
       Message directMessage = new 
CoreMessage(a2.getStorageManager().generateID(), 512);
       directMessage.setAddress(TOPIC_NAME);
       directMessage.putStringProperty("Test", "t1");
+
+      // we will route a single message to subscription-0. a previous search 
found the RemoteBinding into remoteQueueBindins_a2;
       remoteQueueBindings_a2.get(0).route(directMessage, routingContext);
       a2.getPostOffice().processRoute(directMessage, routingContext, false);
       routingContext.getTransaction().commit();
 
       for (int i = 0; i < 10; i++) {
          String name = "my-topic-shared-subscription_" + i + ":global";
+
+         if (logger.isDebugEnabled()) {
+            logger.debug("a1 queue {} with {} messages", name, 
a1.locateQueue(name).getMessageCount());
+            logger.debug("b1 queue {} with {} messages", name, 
b1.locateQueue(name).getMessageCount());
+            logger.debug("a2 queue {} with {} messages", name, 
a2.locateQueue(name).getMessageCount());
+            logger.debug("b2 queue {} with {} messages", name, 
b2.locateQueue(name).getMessageCount());
+         }
+
+         // Since we routed to subscription-0 only, the outcome mirroring 
should only receive the output on subscription-0 on b1;
+         // When the routing happens after a clustered operation, mirror 
should be done individually to each routed queue.
+         // this test is validating that only subscription-0 got the message 
on both a1 and b1;
+         // notice that the initial route happened on a2, which then 
transfered the message towards a1.
+         // a1 made the copy to b1 through mirroring, and only subscription-0 
should receive a message.
+         // which is exactly what should happen through message-redistribution 
in clustering
+
          Wait.assertEquals(i == 0 ? 1 : 0, 
a1.locateQueue(name)::getMessageCount);
-         logger.debug("a1 queue {} with {} messages", name, 
a1.locateQueue(name).getMessageCount());
-         logger.debug("b1 queue {} with {} messages", name, 
b1.locateQueue(name).getMessageCount());
-         logger.debug("a2 queue {} with {} messages", name, 
a2.locateQueue(name).getMessageCount());
-         logger.debug("b2 queue {} with {} messages", name, 
b2.locateQueue(name).getMessageCount());
          Wait.assertEquals(i == 0 ? 1 : 0, 
b1.locateQueue(name)::getMessageCount);
          Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
          Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
       }
    }
 
+
+
+   // This test is faking a MirrorSend.
+   // First it will send with an empty collection, then to a single queue
+   @Test
+   public void testFakeMirrorSend() throws Exception {
+      final String protocol = "AMQP";
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" 
+ i);
+         }
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" 
+ i);
+         }
+      }
+
+      Wait.assertTrue(() -> 
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+
+      List<RemoteQueueBinding> remoteQueueBindings_a2 = new ArrayList<>();
+      // making sure the queues created on a2 are propagated into b2
+      
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((a, 
b) -> {
+         if (b instanceof RemoteQueueBindingImpl && 
b.getClusterName().toString().startsWith(subscriptionName + "_0")) {
+            logger.debug("{} = {}", a, b);
+            remoteQueueBindings_a2.add((RemoteQueueBinding) b);
+         }
+      });
+
+      Assert.assertEquals(1, remoteQueueBindings_a2.size());
+      AmqpConnection connection = createAmqpConnection(new 
URI("tcp://localhost:" + A_1_PORT));
+      runAfter(connection::close);
+      AmqpSession session = connection.createSession();
+
+      AmqpMessage message = new AmqpMessage();
+      message.setAddress(TOPIC_NAME);
+      
message.setDeliveryAnnotation(AMQPMirrorControllerSource.TARGET_QUEUES.toString(),
 new ArrayList<>());
+      
message.setDeliveryAnnotation(AMQPMirrorControllerSource.INTERNAL_ID.toString(),
 a1.getStorageManager().generateID());
+      
message.setDeliveryAnnotation(AMQPMirrorControllerSource.BROKER_ID.toString(), 
String.valueOf(b1.getNodeID()));
+
+      AmqpSender sender = session.createSender(mirrorName(A_1_PORT), new 
Symbol[]{Symbol.getSymbol("amq.mirror")});
+      sender.send(message);
+
+
+      for (int i = 0; i < 10; i++) {
+         String name = "my-topic-shared-subscription_" + i + ":global";
+
+         // all queues should be empty
+         Wait.assertEquals(0, a1.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, b1.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
+      }
+
+      message = new AmqpMessage();
+      message.setAddress(TOPIC_NAME);
+      ArrayList<String> singleQueue = new ArrayList<>();
+      singleQueue.add("my-topic-shared-subscription_3:global");
+      singleQueue.add("IDONTEXIST");
+      
message.setDeliveryAnnotation(AMQPMirrorControllerSource.TARGET_QUEUES.toString(),
 singleQueue);
+      
message.setDeliveryAnnotation(AMQPMirrorControllerSource.INTERNAL_ID.toString(),
 a1.getStorageManager().generateID());
+      
message.setDeliveryAnnotation(AMQPMirrorControllerSource.BROKER_ID.toString(), 
String.valueOf(b1.getNodeID())); // simulating a node from b1, so it is not 
sent bak to b1

Review Comment:
   bak = back



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -356,28 +366,140 @@ public void testRemoteBindingRouting() throws Exception {
 
       Assert.assertEquals(1, remoteQueueBindings_a2.size());
 
-      RoutingContext routingContext = new RoutingContextImpl(new 
TransactionImpl(a2.getStorageManager())).setMirrorOption(MirrorOption.individualRoute);
+      RoutingContext routingContext = new RoutingContextImpl(new 
TransactionImpl(a2.getStorageManager())).setMirrorOption(RoutingContext.MirrorOption.individualRoute);
 
       Message directMessage = new 
CoreMessage(a2.getStorageManager().generateID(), 512);
       directMessage.setAddress(TOPIC_NAME);
       directMessage.putStringProperty("Test", "t1");
+
+      // we will route a single message to subscription-0. a previous search 
found the RemoteBinding into remoteQueueBindins_a2;
       remoteQueueBindings_a2.get(0).route(directMessage, routingContext);
       a2.getPostOffice().processRoute(directMessage, routingContext, false);
       routingContext.getTransaction().commit();
 
       for (int i = 0; i < 10; i++) {
          String name = "my-topic-shared-subscription_" + i + ":global";
+
+         if (logger.isDebugEnabled()) {
+            logger.debug("a1 queue {} with {} messages", name, 
a1.locateQueue(name).getMessageCount());
+            logger.debug("b1 queue {} with {} messages", name, 
b1.locateQueue(name).getMessageCount());
+            logger.debug("a2 queue {} with {} messages", name, 
a2.locateQueue(name).getMessageCount());
+            logger.debug("b2 queue {} with {} messages", name, 
b2.locateQueue(name).getMessageCount());
+         }
+
+         // Since we routed to subscription-0 only, the outcome mirroring 
should only receive the output on subscription-0 on b1;
+         // When the routing happens after a clustered operation, mirror 
should be done individually to each routed queue.
+         // this test is validating that only subscription-0 got the message 
on both a1 and b1;
+         // notice that the initial route happened on a2, which then 
transfered the message towards a1.
+         // a1 made the copy to b1 through mirroring, and only subscription-0 
should receive a message.
+         // which is exactly what should happen through message-redistribution 
in clustering
+
          Wait.assertEquals(i == 0 ? 1 : 0, 
a1.locateQueue(name)::getMessageCount);
-         logger.debug("a1 queue {} with {} messages", name, 
a1.locateQueue(name).getMessageCount());
-         logger.debug("b1 queue {} with {} messages", name, 
b1.locateQueue(name).getMessageCount());
-         logger.debug("a2 queue {} with {} messages", name, 
a2.locateQueue(name).getMessageCount());
-         logger.debug("b2 queue {} with {} messages", name, 
b2.locateQueue(name).getMessageCount());
          Wait.assertEquals(i == 0 ? 1 : 0, 
b1.locateQueue(name)::getMessageCount);
          Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
          Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
       }
    }
 
+
+
+   // This test is faking a MirrorSend.
+   // First it will send with an empty collection, then to a single queue
+   @Test
+   public void testFakeMirrorSend() throws Exception {
+      final String protocol = "AMQP";
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" 
+ i);
+         }
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" 
+ i);
+         }
+      }
+
+      Wait.assertTrue(() -> 
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+
+      List<RemoteQueueBinding> remoteQueueBindings_a2 = new ArrayList<>();
+      // making sure the queues created on a2 are propagated into b2
+      
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((a, 
b) -> {
+         if (b instanceof RemoteQueueBindingImpl && 
b.getClusterName().toString().startsWith(subscriptionName + "_0")) {
+            logger.debug("{} = {}", a, b);
+            remoteQueueBindings_a2.add((RemoteQueueBinding) b);
+         }
+      });
+
+      Assert.assertEquals(1, remoteQueueBindings_a2.size());
+      AmqpConnection connection = createAmqpConnection(new 
URI("tcp://localhost:" + A_1_PORT));
+      runAfter(connection::close);
+      AmqpSession session = connection.createSession();
+
+      AmqpMessage message = new AmqpMessage();
+      message.setAddress(TOPIC_NAME);
+      
message.setDeliveryAnnotation(AMQPMirrorControllerSource.TARGET_QUEUES.toString(),
 new ArrayList<>());
+      
message.setDeliveryAnnotation(AMQPMirrorControllerSource.INTERNAL_ID.toString(),
 a1.getStorageManager().generateID());
+      
message.setDeliveryAnnotation(AMQPMirrorControllerSource.BROKER_ID.toString(), 
String.valueOf(b1.getNodeID()));
+
+      AmqpSender sender = session.createSender(mirrorName(A_1_PORT), new 
Symbol[]{Symbol.getSymbol("amq.mirror")});
+      sender.send(message);
+
+
+      for (int i = 0; i < 10; i++) {
+         String name = "my-topic-shared-subscription_" + i + ":global";
+
+         // all queues should be empty

Review Comment:
   make it easy for later, specify why as well, ...'because the target queue 
list was empty'



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java:
##########
@@ -37,35 +36,45 @@ public ActiveMQServerControlUsingCoreTest(boolean 
legacyCreateQueue) {
    }
 
    @Override
-   @Test
+   @Ignore
    public void testListProducersAgainstServer() throws Exception {
-      // have to disable this test, as it's dealing with producers objects.
-      // the test itself will be using a producer to manage the server.
-      // so the test will include noise and it might fail occasionally
-      Assume.assumeTrue(false);
    }
-   // ActiveMQServerControlTest overrides --------------------------
+
+
+   @Ignore
+   @Override
+   public void testListSessions() throws Exception {
+      // similarly to testListProducersAgainstServer test,
+      // this test will have different objecgs created when running over core,
+      // what may introduce noise to the test

Review Comment:
   objecgs = objects
   
   Its not clear what 'noise' is being referenced or why it matters...the 
description you removed for the other test was actually clearer, that the test 
creates producers and these interfere with inspecting producers.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -42,7 +44,6 @@
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.MirrorOption;

Review Comment:
   This is fine, though if you had changed it import 
org.apache.activemq.artemis.core.server.RoutingContext.MirrorOption instead 
then you wouldnt have to update all the using code to doing e.g 
"RoutingContext.MirrorOption.individualRoute", it would have worked as it was 
before.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java:
##########
@@ -37,35 +36,45 @@ public ActiveMQServerControlUsingCoreTest(boolean 
legacyCreateQueue) {
    }
 
    @Override
-   @Test
+   @Ignore
    public void testListProducersAgainstServer() throws Exception {
-      // have to disable this test, as it's dealing with producers objects.
-      // the test itself will be using a producer to manage the server.
-      // so the test will include noise and it might fail occasionally
-      Assume.assumeTrue(false);
    }
-   // ActiveMQServerControlTest overrides --------------------------
+
+
+   @Ignore
+   @Override
+   public void testListSessions() throws Exception {
+      // similarly to testListProducersAgainstServer test,

Review Comment:
   Maybe explain it in the first test testListProducersAgainstServer as it was 
originally, and then reference that from this one with just this first 
sentence...rather than reference the earlier test when it would now have no 
explanation why it is ignored after these changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to