This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new fea84e39f5 ARTEMIS-4247 BrokerClustering vs Mirror code improvements
fea84e39f5 is described below

commit fea84e39f5f821352323784cded2384f1b358cd2
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Apr 21 16:35:39 2023 -0400

    ARTEMIS-4247 BrokerClustering vs Mirror code improvements
---
 .../connect/mirror/AMQPMirrorControllerSource.java |  14 +-
 .../activemq/artemis/core/postoffice/Bindings.java |   2 +-
 .../artemis/core/postoffice/impl/BindingsImpl.java |  17 +--
 .../activemq/artemis/core/server/MirrorOption.java |  25 ----
 .../artemis/core/server/RoutingContext.java        |   8 +-
 .../artemis/core/server/impl/QueueImpl.java        |   6 +-
 .../core/server/impl/RoutingContextImpl.java       |   5 +-
 .../amqp/connect/AMQPRedistributeClusterTest.java  | 148 +++++++++++++++++++--
 .../impl/WildcardAddressManagerUnitTest.java       |  10 +-
 9 files changed, 167 insertions(+), 68 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 7c7a63f18d..160e44c1aa 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -32,9 +32,9 @@ import 
org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.MirrorOption;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingContext.MirrorOption;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@@ -77,11 +77,9 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
    public static final Symbol INTERNAL_ID = 
Symbol.getSymbol("x-opt-amq-mr-id");
    public static final Symbol INTERNAL_DESTINATION = 
Symbol.getSymbol("x-opt-amq-mr-dst");
 
-   /** When a clustered node (from regular cluster connections) receives a 
message
-       it will have target queues associated with it
-      this could be from message redistribution or simply load balancing.
-      an that case this will have the queue associated with it */
-   public static final Symbol TARGET_QUEUES = 
Symbol.getSymbol("x-opt-amq-trg-q");
+   /* In a Multi-cast address (or JMS Topics) we may in certain cases 
(clustered-routing for instance)
+      select which particular queues will receive the routing output */
+   public static final Symbol TARGET_QUEUES = 
Symbol.getSymbol("x-opt-amq-mr-trg-q");
 
    // Capabilities
    public static final Symbol MIRROR_CAPABILITY = 
Symbol.getSymbol("amq.mirror");
@@ -90,7 +88,7 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
    public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = 
SimpleString.toSimpleString(INTERNAL_ID.toString());
    public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = 
SimpleString.toSimpleString(BROKER_ID.toString());
 
-   private static final ThreadLocal<RoutingContext> mirrorControlRouting = 
ThreadLocal.withInitial(() -> new 
RoutingContextImpl(null).setMirrorOption(MirrorOption.disabled));
+   private static final ThreadLocal<RoutingContext> mirrorControlRouting = 
ThreadLocal.withInitial(() -> new RoutingContextImpl(null));
 
    final Queue snfQueue;
    final ActiveMQServer server;
@@ -614,7 +612,7 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
    public static void route(ActiveMQServer server, Message message) throws 
Exception {
       message.setMessageID(server.getStorageManager().generateID());
       RoutingContext ctx = mirrorControlRouting.get();
-      ctx.clear();
+      ctx.clear().setMirrorOption(MirrorOption.disabled);
       server.getPostOffice().route(message, ctx, false);
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
index c73d6e170f..6ac2c0ca05 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
@@ -59,7 +59,7 @@ public interface Bindings extends UnproposalListener {
 
    boolean allowRedistribute();
 
-   void forEach(BiConsumer<SimpleString, Binding> bindingConsumer);
+   void forEach(BiConsumer<String, Binding> bindingConsumer);
 
    int size();
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 16072dbf60..632df2a4af 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -39,9 +39,9 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.MirrorOption;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingContext.MirrorOption;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
@@ -69,7 +69,7 @@ public final class BindingsImpl implements Bindings {
     * This is the same as bindingsIdMap but indexed on the binding's 
uniqueName rather than ID. Two maps are
     * maintained to speed routing, otherwise we'd have to loop through the 
bindingsIdMap when routing to an FQQN.
     */
-   private final Map<SimpleString, Binding> bindingsNameMap = new 
ConcurrentHashMap<>();
+   private final Map<String, Binding> bindingsNameMap = new 
ConcurrentHashMap<>();
 
    private final Set<Binding> exclusiveBindings = new CopyOnWriteArraySet<>();
 
@@ -123,7 +123,7 @@ public final class BindingsImpl implements Bindings {
 
    @Override
    public Binding getBinding(String name) {
-      return bindingsNameMap.get(SimpleString.toSimpleString(name));
+      return bindingsNameMap.get(name);
    }
 
    @Override
@@ -138,7 +138,7 @@ public final class BindingsImpl implements Bindings {
          }
 
          bindingsIdMap.put(binding.getID(), binding);
-         bindingsNameMap.put(binding.getUniqueName(), binding);
+         bindingsNameMap.put(String.valueOf(binding.getUniqueName()), binding);
 
          if (binding instanceof RemoteQueueBinding) {
             setMessageLoadBalancingType(((RemoteQueueBinding) 
binding).getMessageLoadBalancingType());
@@ -162,7 +162,7 @@ public final class BindingsImpl implements Bindings {
 
    @Override
    public Binding removeBindingByUniqueName(final SimpleString 
bindingUniqueName) {
-      final Binding binding = bindingsNameMap.remove(bindingUniqueName);
+      final Binding binding = 
bindingsNameMap.remove(String.valueOf(bindingUniqueName));
       if (binding == null) {
          return null;
       }
@@ -174,7 +174,7 @@ public final class BindingsImpl implements Bindings {
          }
 
          bindingsIdMap.remove(binding.getID());
-         assert !bindingsNameMap.containsKey(binding.getUniqueName());
+         assert 
!bindingsNameMap.containsKey(String.valueOf(binding.getUniqueName()));
 
          if (logger.isTraceEnabled()) {
             logger.trace("Removing binding {} from {} bindingTable: {}", 
binding, this, debugBindings());
@@ -191,7 +191,7 @@ public final class BindingsImpl implements Bindings {
    }
 
    @Override
-   public void forEach(BiConsumer<SimpleString, Binding> bindingConsumer) {
+   public void forEach(BiConsumer<String, Binding> bindingConsumer) {
       bindingsNameMap.forEach(bindingConsumer);
    }
 
@@ -313,7 +313,7 @@ public final class BindingsImpl implements Bindings {
             routeUsingStrictOrdering(message, context, groupingHandler, 
groupId, 0);
          } else if (CompositeAddress.isFullyQualified(message.getAddress())) {
             context.clear().setReusable(false);
-            final Binding theBinding = 
bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
+            final Binding theBinding = 
bindingsNameMap.get(String.valueOf(CompositeAddress.extractQueueName(message.getAddressSimpleString())));
             if (theBinding != null) {
                theBinding.route(message, context);
             }
@@ -601,6 +601,7 @@ public final class BindingsImpl implements Bindings {
                                  final byte[] ids) throws Exception {
       if (!context.isMirrorDisabled()) {
          context.setMirrorOption(MirrorOption.individualRoute);
+         context.setReusable(false);
       }
       byte[] idsToAck = (byte[]) 
message.removeProperty(Message.HDR_ROUTE_TO_ACK_IDS);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MirrorOption.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MirrorOption.java
deleted file mode 100644
index 0ac0181ece..0000000000
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MirrorOption.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.server;
-
-/** This is to be used in conjunction with RoutingContext, where we control 
certain semantics during routing.
- *  */
-public enum MirrorOption {
-   enabled,
-   disabled,
-   individualRoute
-}
\ No newline at end of file
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
index cd64a1f484..73d7ebf381 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
@@ -39,8 +39,6 @@ public interface RoutingContext {
    */
    boolean isReusable();
 
-   /** If the routing is from MirrorController, we don't redo mirrorController
-    *  to avoid*/
    MirrorOption getMirrorOption();
 
    void forEachDurable(Consumer<Queue> consumer);
@@ -114,5 +112,11 @@ public interface RoutingContext {
 
    ServerSession getServerSession();
 
+   enum MirrorOption {
+      enabled,
+      disabled,
+      individualRoute
+   }
+
 
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 0c893eee6c..e509f464b9 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -82,10 +82,10 @@ import 
org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.MirrorOption;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingContext.MirrorOption;
 import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerSession;
@@ -708,9 +708,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
       this.server = server;
 
-      if (queueConfiguration.isInternal()) {
-         this.internalQueue = queueConfiguration.isInternal();
-      }
+      this.internalQueue = queueConfiguration.isInternal();
 
       scheduledDeliveryHandler = new 
ScheduledDeliveryHandlerImpl(scheduledExecutor, this);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index e5dc83d27b..8373bc2586 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -26,7 +26,6 @@ import java.util.function.Consumer;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.MirrorOption;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -168,9 +167,7 @@ public class RoutingContextImpl implements RoutingContext {
 
       this.internalOnly = null;
 
-      if (mirrorOption == MirrorOption.individualRoute) {
-         mirrorOption = MirrorOption.enabled;
-      }
+      mirrorOption = MirrorOption.enabled;
 
       return this;
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java
index f52218910a..28c50b6f29 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java
@@ -24,11 +24,13 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 import java.lang.invoke.MethodHandles;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 
+import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -42,9 +44,9 @@ import 
org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.MirrorOption;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingContext.MirrorOption;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
@@ -52,9 +54,14 @@ import 
org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
 import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
 import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -105,18 +112,22 @@ public class AMQPRedistributeClusterTest extends 
AmqpTestSupport {
 
 
       server.setIdentity(name);
-      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new 
LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", 
"tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", 
"tcp://localhost:" + clusterPort);
+      server.getConfiguration().setName("node").setHAPolicyConfiguration(new 
LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", 
"tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", 
"tcp://localhost:" + clusterPort);
 
       ClusterConnectionConfiguration clusterConfiguration = new 
ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
       server.getConfiguration().addClusterConfiguration(clusterConfiguration);
 
       if (mirrorPort > 0) {
-         server.getConfiguration().addAMQPConnection(new 
AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + 
mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new
 AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new 
SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
+         server.getConfiguration().addAMQPConnection(new 
AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + 
mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new
 AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new 
SimpleString(mirrorName(mirrorPort)))));
       }
 
       return server;
    }
 
+   private String mirrorName(int mirrorPort) {
+      return "$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort;
+   }
+
    @Test
    public void testQueueRedistributionAMQP() throws Exception {
       internalQueueRedistribution("AMQP");
@@ -158,7 +169,7 @@ public class AMQPRedistributeClusterTest extends 
AmqpTestSupport {
                consumer = 
sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
             } else {
                place = "A2";
-               consumer = 
sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+               consumer = 
sessionA2.createConsumer(sessionA2.createQueue(QUEUE_NAME));
             }
             TextMessage message = (TextMessage) consumer.receive(5000);
             Assert.assertNotNull(message);
@@ -225,7 +236,7 @@ public class AMQPRedistributeClusterTest extends 
AmqpTestSupport {
 
       {
          HashSet<String> subscriptionSet = new HashSet<>();
-         // making sure the queues created on a1 are propaged into b1
+         // making sure the queues created on a1 are propagated into b1
          
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, 
b) -> {
             logger.debug("{} = {}", n, b);
             if (b instanceof LocalQueueBinding) {
@@ -238,7 +249,7 @@ public class AMQPRedistributeClusterTest extends 
AmqpTestSupport {
          subscriptionQueueName = subscriptionSet.iterator().next();
       }
 
-      // making sure the queues created on a2 are propaged into b2
+      // making sure the queues created on a2 are propagated into b2
       
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, 
b) -> {
          logger.debug("{} = {}", n, b);
          if (b instanceof LocalQueueBinding) {
@@ -346,7 +357,7 @@ public class AMQPRedistributeClusterTest extends 
AmqpTestSupport {
       Wait.assertTrue(() -> 
b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
 
       List<RemoteQueueBinding> remoteQueueBindings_a2 = new ArrayList<>();
-      // making sure the queues created on a2 are propaged into b2
+      // making sure the queues created on a2 are propagated into b2
       
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((a, 
b) -> {
          if (b instanceof RemoteQueueBindingImpl && 
b.getClusterName().toString().startsWith(subscriptionName + "_0")) {
             logger.debug("{} = {}", a, b);
@@ -361,23 +372,138 @@ public class AMQPRedistributeClusterTest extends 
AmqpTestSupport {
       Message directMessage = new 
CoreMessage(a2.getStorageManager().generateID(), 512);
       directMessage.setAddress(TOPIC_NAME);
       directMessage.putStringProperty("Test", "t1");
+
+      // we will route a single message to subscription-0. a previous search 
found the RemoteBinding into remoteQueueBindins_a2;
       remoteQueueBindings_a2.get(0).route(directMessage, routingContext);
       a2.getPostOffice().processRoute(directMessage, routingContext, false);
       routingContext.getTransaction().commit();
 
       for (int i = 0; i < 10; i++) {
          String name = "my-topic-shared-subscription_" + i + ":global";
+
+         if (logger.isDebugEnabled()) {
+            logger.debug("a1 queue {} with {} messages", name, 
a1.locateQueue(name).getMessageCount());
+            logger.debug("b1 queue {} with {} messages", name, 
b1.locateQueue(name).getMessageCount());
+            logger.debug("a2 queue {} with {} messages", name, 
a2.locateQueue(name).getMessageCount());
+            logger.debug("b2 queue {} with {} messages", name, 
b2.locateQueue(name).getMessageCount());
+         }
+
+         // Since we routed to subscription-0 only, the outcome mirroring 
should only receive the output on subscription-0 on b1;
+         // When the routing happens after a clustered operation, mirror 
should be done individually to each routed queue.
+         // this test is validating that only subscription-0 got the message 
on both a1 and b1;
+         // notice that the initial route happened on a2, which then 
transfered the message towards a1.
+         // a1 made the copy to b1 through mirroring, and only subscription-0 
should receive a message.
+         // which is exactly what should happen through message-redistribution 
in clustering
+
          Wait.assertEquals(i == 0 ? 1 : 0, 
a1.locateQueue(name)::getMessageCount);
-         logger.debug("a1 queue {} with {} messages", name, 
a1.locateQueue(name).getMessageCount());
-         logger.debug("b1 queue {} with {} messages", name, 
b1.locateQueue(name).getMessageCount());
-         logger.debug("a2 queue {} with {} messages", name, 
a2.locateQueue(name).getMessageCount());
-         logger.debug("b2 queue {} with {} messages", name, 
b2.locateQueue(name).getMessageCount());
          Wait.assertEquals(i == 0 ? 1 : 0, 
b1.locateQueue(name)::getMessageCount);
          Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
          Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
       }
    }
 
+
+
+   // This test is faking a MirrorSend.
+   // First it will send with an empty collection, then to a single queue
+   @Test
+   public void testFakeMirrorSend() throws Exception {
+      final String protocol = "AMQP";
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" 
+ i);
+         }
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" 
+ i);
+         }
+      }
+
+      Wait.assertTrue(() -> 
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+      Wait.assertTrue(() -> 
b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 
20);
+
+      List<RemoteQueueBinding> remoteQueueBindings_a2 = new ArrayList<>();
+      // making sure the queues created on a2 are propagated into b2
+      
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((a, 
b) -> {
+         if (b instanceof RemoteQueueBindingImpl && 
b.getClusterName().toString().startsWith(subscriptionName + "_0")) {
+            logger.debug("{} = {}", a, b);
+            remoteQueueBindings_a2.add((RemoteQueueBinding) b);
+         }
+      });
+
+      Assert.assertEquals(1, remoteQueueBindings_a2.size());
+      AmqpConnection connection = createAmqpConnection(new 
URI("tcp://localhost:" + A_1_PORT));
+      runAfter(connection::close);
+      AmqpSession session = connection.createSession();
+
+      AmqpMessage message = new AmqpMessage();
+      message.setAddress(TOPIC_NAME);
+      // this is sending an empty ArrayList for the TARGET_QUEUES.
+      // no queues should be altered when there's an empty TARGET_QUEUES
+      
message.setDeliveryAnnotation(AMQPMirrorControllerSource.TARGET_QUEUES.toString(),
 new ArrayList<>());
+      
message.setDeliveryAnnotation(AMQPMirrorControllerSource.INTERNAL_ID.toString(),
 a1.getStorageManager().generateID());
+      
message.setDeliveryAnnotation(AMQPMirrorControllerSource.BROKER_ID.toString(), 
String.valueOf(b1.getNodeID()));
+
+      AmqpSender sender = session.createSender(mirrorName(A_1_PORT), new 
Symbol[]{Symbol.getSymbol("amq.mirror")});
+      sender.send(message);
+
+
+      for (int i = 0; i < 10; i++) {
+         String name = "my-topic-shared-subscription_" + i + ":global";
+
+         // all queues should be empty
+         // because the send to the mirror had an empty TARGET_QUEUES
+         Wait.assertEquals(0, a1.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, b1.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
+      }
+
+      message = new AmqpMessage();
+      message.setAddress(TOPIC_NAME);
+      ArrayList<String> singleQueue = new ArrayList<>();
+      singleQueue.add("my-topic-shared-subscription_3:global");
+      singleQueue.add("IDONTEXIST");
+      
message.setDeliveryAnnotation(AMQPMirrorControllerSource.TARGET_QUEUES.toString(),
 singleQueue);
+      
message.setDeliveryAnnotation(AMQPMirrorControllerSource.INTERNAL_ID.toString(),
 a1.getStorageManager().generateID());
+      
message.setDeliveryAnnotation(AMQPMirrorControllerSource.BROKER_ID.toString(), 
String.valueOf(b1.getNodeID())); // simulating a node from b1, so it is not 
sent back to b1
+
+      sender.send(message);
+
+      for (int i = 0; i < 10; i++) {
+         String name = "my-topic-shared-subscription_" + i + ":global";
+
+         if (i == 3) {
+            // only this queue, on this server should have received a message
+            // it shouldn't also be mirrored to its replica
+            Wait.assertEquals(1, a1.locateQueue(name)::getMessageCount);
+         } else {
+            Wait.assertEquals(0, a1.locateQueue(name)::getMessageCount);
+         }
+         Wait.assertEquals(0, b1.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
+      }
+
+   }
+
+
+
    // This test has distinct subscriptions on each node and it is making sure 
the Mirror Routing is working accurately
    @Test
    public void testMultiNodeSubscription() throws Exception {
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
index 33463ed3d1..58c0ddf30b 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
@@ -434,7 +434,7 @@ public class WildcardAddressManagerUnitTest extends 
ActiveMQTestBase {
    static class BindingsFake implements Bindings {
 
       SimpleString name;
-      ConcurrentHashMap<SimpleString, Binding> bindings = new 
ConcurrentHashMap<>();
+      ConcurrentHashMap<String, Binding> bindings = new ConcurrentHashMap<>();
 
       BindingsFake(SimpleString address) {
          this.name = address;
@@ -447,12 +447,12 @@ public class WildcardAddressManagerUnitTest extends 
ActiveMQTestBase {
 
       @Override
       public void addBinding(Binding binding) {
-         bindings.put(binding.getUniqueName(), binding);
+         bindings.put(String.valueOf(binding.getUniqueName()), binding);
       }
 
       @Override
       public Binding removeBindingByUniqueName(SimpleString uniqueName) {
-         return bindings.remove(uniqueName);
+         return bindings.remove(String.valueOf(uniqueName));
       }
 
       @Override
@@ -467,11 +467,11 @@ public class WildcardAddressManagerUnitTest extends 
ActiveMQTestBase {
 
       @Override
       public Binding getBinding(String name) {
-         return null;
+         return bindings.get(name);
       }
 
       @Override
-      public void forEach(BiConsumer<SimpleString, Binding> bindingConsumer) {
+      public void forEach(BiConsumer<String, Binding> bindingConsumer) {
          bindings.forEach(bindingConsumer);
       }
 

Reply via email to