gemmellr commented on code in PR #5220:
URL: https://github.com/apache/activemq-artemis/pull/5220#discussion_r1869366885


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java:
##########
@@ -75,6 +77,15 @@ public AMQPMirrorBrokerConnectionElement 
setQueueCreation(boolean queueCreation)
       return this;
    }
 
+   public boolean getNoForward() {

Review Comment:
   isNoForward would be more typical, and more consistent with the others 
around it.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java:
##########
@@ -186,6 +191,43 @@ public void 
testBrokerHandlesSenderLinkOmitsMirrorCapability() throws Exception
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void testBrokerHandlesSenderLinkOmitsNoForwardCapability() throws 
Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.expectAttach().ofSender()
+            .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR"))
+            .withDesiredCapabilities("amq.mirror", 
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString(), 
AMQPMirrorControllerSource.NO_FORWARD.toString())
+            .respond()
+            .withOfferedCapabilities("amq.mirror", 
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString());

Review Comment:
   Could use the constants for both rather than just one.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java:
##########
@@ -428,6 +428,12 @@ public HandleStatus handle(final MessageReference ref) 
throws Exception {
 
          return HandleStatus.NO_MATCH;
       }
+      if (callback != null && callback.filterRef(ref, 
ServerConsumerImpl.this)) {
+         if (logger.isDebugEnabled()) {
+            // TODO
+         }
+         return HandleStatus.NO_MATCH;

Review Comment:
   Though different than the previous drop-inside-consumer approach, which also 
never actually sends the message, doing this instead still similarly means 
nothing evers consume the message. So its still just going to stay there 
forever (difference this time is, it wont be in a zombie-delivering state). 
Something will need to be done with the message.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java:
##########
@@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws 
Exception {
       }
    }
 
+   @Override
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      params.put("amqpMinLargeMessageSize", 100 * 1024);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, true);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, true);
+   }
+
+   private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean 
tunneling, boolean longMessage) throws Exception {
+      final Map<String, Object> brokerProperties = new HashMap<>();
+      brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), 
"Test-Broker");
+
+      final String[] capabilities;
+      ArrayList<String> capabilitiesList = new ArrayList<>();
+      int messageFormat = 0;
+
+      capabilitiesList.add("amq.mirror");

Review Comment:
   similarly



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java:
##########
@@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws 
Exception {
       }
    }
 
+   @Override
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      params.put("amqpMinLargeMessageSize", 100 * 1024);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, true);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, true);
+   }
+
+   private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean 
tunneling, boolean longMessage) throws Exception {
+      final Map<String, Object> brokerProperties = new HashMap<>();
+      brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), 
"Test-Broker");
+
+      final String[] capabilities;
+      ArrayList<String> capabilitiesList = new ArrayList<>();
+      int messageFormat = 0;
+
+      capabilitiesList.add("amq.mirror");
+      if (tunneling) {
+         
capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString());
+         messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
+      }
+      capabilities = capabilitiesList.toArray(new String[]{});
+
+      // Topology of the test: server -(noForward)-> server_2 -> peer_3
+      try (ProtonTestServer peer_3 = new ProtonTestServer()) {
+         peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS");
+         peer_3.expectOpen().respond();
+         peer_3.expectBegin().respond();
+         peer_3.expectAttach().ofSender()
+            .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR"))
+            .withDesiredCapabilities(capabilities)
+            .respond()
+            .withOfferedCapabilities(capabilities)
+            .withPropertiesMap(brokerProperties);
+         peer_3.remoteFlow().withLinkCredit(10).queue();
+         peer_3.start();
+
+         final URI remoteURI = peer_3.getServerURI();
+         logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+         final int AMQP_PORT_2 = BROKER_PORT_NUM + 1;
+         final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false);
+         {
+            AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+            amqpConnection.setReconnectAttempts(0);// No reconnects
+            amqpConnection.setUser("user");
+            amqpConnection.setPassword("pass");
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().addProperty(TUNNEL_CORE_MESSAGES, 
Boolean.toString(tunneling)).setQueueCreation(true).setAddressFilter(getQueueName()
 + ",sometest"));
+            server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+            amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", 
"tcp://localhost:" + BROKER_PORT_NUM);
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()));
+            server_2.getConfiguration().addAMQPConnection(amqpConnection);
+         }
+
+         {
+            AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", 
"tcp://localhost:" + AMQP_PORT_2).setRetryInterval(50);
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true));
+            server.getConfiguration().addAMQPConnection(amqpConnection);
+         }
+
+         // connect the topology
+         server_2.start();
+         server.start();
+         peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         // Create queues & send messages on server, nothing will reach peer_3
+         createAddressAndQueues(server);
+         Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
+         Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
+
+         final org.apache.activemq.artemis.core.server.Queue q1 = 
server.locateQueue(getQueueName());
+         assertNotNull(q1);
+
+         final org.apache.activemq.artemis.core.server.Queue q2 = 
server_2.locateQueue(getQueueName());
+         assertNotNull(q2);
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + BROKER_PORT_NUM);
+         final ConnectionFactory factory_2 = 
CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2);
+
+         String message = "A message!";
+         if (longMessage) {
+            String sLongMessage = "";
+            for (int i = 0; i < 11 * 1014; i++) {
+               sLongMessage += message;
+            }
+            message = sLongMessage;
+         }

Review Comment:
   Probably worth just using a utility method that takes a length and hits it 
and returns the value. Then call it passing some value based on the configured 
threshold (which coupled with last comment, could reference a variable to make 
things clearer later). Even now, the use of 11 * 1014 isnt exactly obvious.
   
   Though its only a test, probably nicer to use a StringBuilder too than 
concat.
   
   



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPNoForwardMirroringTest.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class AMQPNoForwardMirroringTest extends AmqpClientTestSupport {

Review Comment:
   Most of the tests added in the previous/existing class, would actually be 
obvious candidates for a class with this name. Perhaps a helpful comment here 
noting their presence in the other class and to also see it.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java:
##########
@@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws 
Exception {
       }
    }
 
+   @Override
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      params.put("amqpMinLargeMessageSize", 100 * 1024);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, true);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, true);
+   }
+
+   private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean 
tunneling, boolean longMessage) throws Exception {
+      final Map<String, Object> brokerProperties = new HashMap<>();
+      brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), 
"Test-Broker");
+
+      final String[] capabilities;
+      ArrayList<String> capabilitiesList = new ArrayList<>();
+      int messageFormat = 0;
+
+      capabilitiesList.add("amq.mirror");
+      if (tunneling) {
+         
capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString());
+         messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
+      }
+      capabilities = capabilitiesList.toArray(new String[]{});
+
+      // Topology of the test: server -(noForward)-> server_2 -> peer_3
+      try (ProtonTestServer peer_3 = new ProtonTestServer()) {
+         peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS");
+         peer_3.expectOpen().respond();
+         peer_3.expectBegin().respond();
+         peer_3.expectAttach().ofSender()
+            .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR"))
+            .withDesiredCapabilities(capabilities)
+            .respond()
+            .withOfferedCapabilities(capabilities)
+            .withPropertiesMap(brokerProperties);
+         peer_3.remoteFlow().withLinkCredit(10).queue();
+         peer_3.start();
+
+         final URI remoteURI = peer_3.getServerURI();
+         logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+         final int AMQP_PORT_2 = BROKER_PORT_NUM + 1;
+         final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false);
+         {
+            AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+            amqpConnection.setReconnectAttempts(0);// No reconnects
+            amqpConnection.setUser("user");
+            amqpConnection.setPassword("pass");
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().addProperty(TUNNEL_CORE_MESSAGES, 
Boolean.toString(tunneling)).setQueueCreation(true).setAddressFilter(getQueueName()
 + ",sometest"));
+            server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+            amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", 
"tcp://localhost:" + BROKER_PORT_NUM);
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()));
+            server_2.getConfiguration().addAMQPConnection(amqpConnection);
+         }
+
+         {
+            AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", 
"tcp://localhost:" + AMQP_PORT_2).setRetryInterval(50);

Review Comment:
   Should put a bound on the reconnects based on some reasonable overall 
timeout.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java:
##########
@@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws 
Exception {
       }
    }
 
+   @Override
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      params.put("amqpMinLargeMessageSize", 100 * 1024);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, true);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, true);
+   }
+
+   private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean 
tunneling, boolean longMessage) throws Exception {
+      final Map<String, Object> brokerProperties = new HashMap<>();
+      brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), 
"Test-Broker");
+
+      final String[] capabilities;
+      ArrayList<String> capabilitiesList = new ArrayList<>();
+      int messageFormat = 0;
+
+      capabilitiesList.add("amq.mirror");
+      if (tunneling) {
+         
capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString());
+         messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
+      }
+      capabilities = capabilitiesList.toArray(new String[]{});
+
+      // Topology of the test: server -(noForward)-> server_2 -> peer_3
+      try (ProtonTestServer peer_3 = new ProtonTestServer()) {
+         peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS");
+         peer_3.expectOpen().respond();
+         peer_3.expectBegin().respond();
+         peer_3.expectAttach().ofSender()
+            .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR"))
+            .withDesiredCapabilities(capabilities)
+            .respond()
+            .withOfferedCapabilities(capabilities)
+            .withPropertiesMap(brokerProperties);
+         peer_3.remoteFlow().withLinkCredit(10).queue();
+         peer_3.start();
+
+         final URI remoteURI = peer_3.getServerURI();
+         logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+         final int AMQP_PORT_2 = BROKER_PORT_NUM + 1;
+         final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false);
+         {
+            AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+            amqpConnection.setReconnectAttempts(0);// No reconnects
+            amqpConnection.setUser("user");
+            amqpConnection.setPassword("pass");
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().addProperty(TUNNEL_CORE_MESSAGES, 
Boolean.toString(tunneling)).setQueueCreation(true).setAddressFilter(getQueueName()
 + ",sometest"));
+            server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+            amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", 
"tcp://localhost:" + BROKER_PORT_NUM);
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()));
+            server_2.getConfiguration().addAMQPConnection(amqpConnection);
+         }
+
+         {
+            AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", 
"tcp://localhost:" + AMQP_PORT_2).setRetryInterval(50);
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true));
+            server.getConfiguration().addAMQPConnection(amqpConnection);
+         }
+
+         // connect the topology
+         server_2.start();
+         server.start();
+         peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         // Create queues & send messages on server, nothing will reach peer_3
+         createAddressAndQueues(server);
+         Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
+         Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
+
+         final org.apache.activemq.artemis.core.server.Queue q1 = 
server.locateQueue(getQueueName());
+         assertNotNull(q1);
+
+         final org.apache.activemq.artemis.core.server.Queue q2 = 
server_2.locateQueue(getQueueName());
+         assertNotNull(q2);
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + BROKER_PORT_NUM);
+         final ConnectionFactory factory_2 = 
CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2);
+
+         String message = "A message!";
+         if (longMessage) {
+            String sLongMessage = "";
+            for (int i = 0; i < 11 * 1014; i++) {
+               sLongMessage += message;
+            }
+            message = sLongMessage;
+         }
+
+         try (Connection conn = factory.createConnection()) {
+            final Session session = conn.createSession();
+            conn.start();
+
+            final MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+            producer.send(session.createTextMessage(message));
+            producer.close();
+         }
+
+         org.apache.activemq.artemis.utils.Wait.assertEquals(1L, 
q1::getMessageCount, 100, 100);
+         org.apache.activemq.artemis.utils.Wait.assertEquals(1L, 
q2::getMessageCount, 100, 100);
+
+         try (Connection conn = factory_2.createConnection()) {
+            final Session session = conn.createSession();
+            conn.start();
+
+            final MessageConsumer consumer = 
session.createConsumer(session.createQueue(getQueueName()));
+            TextMessage rcvMsg = (TextMessage) consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals(message, rcvMsg.getText());
+            consumer.close();
+         }
+
+         org.apache.activemq.artemis.utils.Wait.assertEquals(0L, 
q2::getMessageCount, 100, 100);
+         org.apache.activemq.artemis.utils.Wait.assertEquals(0L, 
q1::getMessageCount, 6000, 100);
+
+         // give some time to peer_3 to receive messages (if any)
+         Thread.sleep(100);
+         peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); // if messages 
are received here, this should error out
+
+         // Then send messages on the broker directly connected to the peer, 
the messages should make it to the peer.
+         // Receiving these 3 messages in that order confirms that no previous 
data reched the Peer, therefore validating
+         // the test.
+         peer_3.expectTransfer().accept(); // Address create
+         peer_3.expectTransfer().accept(); // Queue create
+         peer_3.expectTransfer().withMessageFormat(messageFormat).accept(); // 
Producer Message

Review Comment:
   Could also verify the text body of the message to better validate things.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class AMQPSquareMirroringTest extends AmqpClientTestSupport {
+
+   protected static final int AMQP_PORT_2 = 5673;
+   protected static final int AMQP_PORT_3 = 5674;
+   protected static final int AMQP_PORT_4 = 5675;
+
+   ActiveMQServer server_2;
+   ActiveMQServer server_3;
+   ActiveMQServer server_4;
+
+   @Override
+   protected ActiveMQServer createServer() throws Exception {
+      return createServer(AMQP_PORT, false);
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,CORE,OPENWIRE";
+   }
+
+   @Test
+   public void testSquare() throws Exception {
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_3 = createServer(AMQP_PORT_3, false);
+      server_4 = createServer(AMQP_PORT_4, false);
+
+      // name the servers, for convenience during debugging
+      server.getConfiguration().setName("1");
+      server_2.getConfiguration().setName("2");
+      server_3.getConfiguration().setName("3");
+      server_4.getConfiguration().setName("4");
+
+      /**
+       *
+       * Setup the mirroring topology to be a square:
+       *
+       * 1 <- - -> 2
+       * ^         ^       The link between 1 and 2 and the
+       * |         |       link between 3 and 4 are noForward
+       * v         v       links in both directions.
+       * 4 <- - -> 3
+       */
+
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "1to2", "tcp://localhost:" 
+ AMQP_PORT_2).setRetryInterval(100);
+         amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setNoForward(true));
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "1to4", "tcp://localhost:" 
+ AMQP_PORT_4).setRetryInterval(100);
+         amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+      }
+
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "2to1", "tcp://localhost:" 
+ AMQP_PORT).setRetryInterval(100);
+         amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setNoForward(true));
+         server_2.getConfiguration().addAMQPConnection(amqpConnection);
+         amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "2to3", "tcp://localhost:" 
+ AMQP_PORT_3).setRetryInterval(100);
+         amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
+         server_2.getConfiguration().addAMQPConnection(amqpConnection);
+      }
+
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "3to2", "tcp://localhost:" 
+ AMQP_PORT_2).setRetryInterval(100);
+         amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
+         server_3.getConfiguration().addAMQPConnection(amqpConnection);
+         amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "3to4", "tcp://localhost:" 
+ AMQP_PORT_4).setRetryInterval(100);
+         amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setNoForward(true));
+         server_3.getConfiguration().addAMQPConnection(amqpConnection);
+      }
+
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "4to1", "tcp://localhost:" 
+ AMQP_PORT).setRetryInterval(100);
+         amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
+         server_4.getConfiguration().addAMQPConnection(amqpConnection);
+         amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "4to3", "tcp://localhost:" 
+ AMQP_PORT_3).setRetryInterval(100);
+         amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setNoForward(true));
+         server_4.getConfiguration().addAMQPConnection(amqpConnection);
+      }
+
+      server.start();
+      server_2.start();
+      server_3.start();
+      server_4.start();
+
+      createAddressAndQueues(server);
+      Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
+      Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
+      Wait.assertTrue(() -> server_3.locateQueue(getQueueName()) != null);
+      Wait.assertTrue(() -> server_4.locateQueue(getQueueName()) != null);
+
+      Queue q1 = server.locateQueue(getQueueName());
+      assertNotNull(q1);
+
+      Queue q2 = server.locateQueue(getQueueName());
+      assertNotNull(q2);
+
+      Queue q3 = server.locateQueue(getQueueName());
+      assertNotNull(q3);
+
+      Queue q4 = server.locateQueue(getQueueName());
+      assertNotNull(q4);

Review Comment:
   These all use the same queue, as they all call on "server", so the rest of 
the test isnt validating what it thinks when it checks them all.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPNoForwardMirroringTest.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class AMQPNoForwardMirroringTest extends AmqpClientTestSupport {
+
+   protected static final int AMQP_PORT_2 = 5673;
+   protected static final int AMQP_PORT_3 = 5674;
+
+   ActiveMQServer server_2;
+   ActiveMQServer server_3;
+
+   @Override
+   protected ActiveMQServer createServer() throws Exception {
+      return createServer(AMQP_PORT, false);
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,CORE,OPENWIRE";
+   }
+
+   @Test
+   public void testNoForward() throws Exception {
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_3 = createServer(AMQP_PORT_3, false);
+
+      // name the servers, for convenience during debugging
+      server.getConfiguration().setName("1");
+      server_2.getConfiguration().setName("2");
+      server_3.getConfiguration().setName("3");
+
+      /**
+       *
+       * the mirroring topology:
+       *
+       * v---------|
+       * 1 ------> 2       The link between 1 and 2 is noForward=true
+       *           ^
+       *           v
+       *           3
+       */
+
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "1to2", "tcp://localhost:" 
+ AMQP_PORT_2).setRetryInterval(100);
+         amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true));
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+      }
+
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "2to1", "tcp://localhost:" 
+ AMQP_PORT).setRetryInterval(100);
+         amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()));
+         server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+         amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "2to3", "tcp://localhost:" 
+ AMQP_PORT_3).setRetryInterval(100);
+         amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()));
+         server_2.getConfiguration().addAMQPConnection(amqpConnection);
+      }
+
+      server.start();
+      server_2.start();
+      server_3.start();
+
+      createAddressAndQueues(server);
+      Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
+      Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
+      // queue creation doesn't reach 3 b/c of the noForward link between 1 
and 2.
+      Wait.assertTrue(() -> server_3.locateQueue(getQueueName()) == null);
+
+      Queue q1 = server.locateQueue(getQueueName());
+      assertNotNull(q1);
+
+      Queue q2 = server_2.locateQueue(getQueueName());
+      assertNotNull(q2);
+
+      ConnectionFactory factory = 
CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + 
AMQP_PORT);
+      ConnectionFactory factory2 = 
CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + 
AMQP_PORT_2);
+      ConnectionFactory factory3 = 
CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + 
AMQP_PORT_3);
+
+      // send from 1, 2 receives, 3 don't.
+      try (Connection conn = factory.createConnection()) {
+         Session session = conn.createSession();
+         MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+         for (int i = 0; i < 10; i++) {
+            producer.send(session.createTextMessage("message " + i));
+         }
+      }
+      Wait.assertEquals(10L, q1::getMessageCount, 1000, 100);
+      Wait.assertEquals(10L, q2::getMessageCount, 1000, 100);
+
+      // consume from 2, 1 and 2 counters go back to 0
+      try (Connection conn = factory2.createConnection()) {
+         Session session = conn.createSession();
+         conn.start();
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue(getQueueName()));
+         for (int i = 0; i < 10; i++) {
+            TextMessage message = (TextMessage) consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals("message " + i, message.getText());
+         }
+      }
+
+      Wait.assertEquals(0L, q1::getMessageCount, 1000, 100);
+      Wait.assertEquals(0L, q2::getMessageCount, 1000, 100);
+
+      Thread.sleep(100); // some time to allow eventual loops
+
+      // queue creation was originated on server, with noForward in place,
+      // the messages never reached server_3, for the rest of the test suite,
+      // we need server_3 to have access to the queue
+      createAddressAndQueues(server_3);
+      Wait.assertTrue(() -> server_3.locateQueue(getQueueName()) != null);
+      Queue q3 = server_3.locateQueue(getQueueName());
+      assertNotNull(q3);
+
+      // produce on 2. 1, 2 and 3 receive messages.
+      try (Connection conn = factory2.createConnection()) {
+         Session session = conn.createSession();
+         MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+         for (int i = 0; i < 10; i++) {
+            producer.send(session.createTextMessage("message " + i));
+         }
+      }
+
+      Wait.assertEquals(10L, q1::getMessageCount, 1000, 100);
+      Wait.assertEquals(10L, q2::getMessageCount, 1000, 100);
+      Wait.assertEquals(10L, q3::getMessageCount, 1000, 100);
+
+      // consume on 1. 1, 2, and 3 counters are back to 0
+      try (Connection conn = factory.createConnection()) {
+         Session session = conn.createSession();
+         conn.start();
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue(getQueueName()));
+         for (int i = 0; i < 10; i++) {
+            TextMessage message = (TextMessage) consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals("message " + i, message.getText());
+         }
+         consumer.close();
+      }
+      Wait.assertEquals(0L, q1::getMessageCount, 1000, 100);
+      Wait.assertEquals(0L, q2::getMessageCount, 1000, 100);
+      Wait.assertEquals(0L, q3::getMessageCount, 1000, 100);
+
+      // produce on 2. 1, 2 and 3 receive messages.
+      try (Connection conn = factory2.createConnection()) {
+         Session session = conn.createSession();
+         MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+         for (int i = 0; i < 10; i++) {
+            producer.send(session.createTextMessage("message " + i));
+         }
+      }
+
+      Wait.assertEquals(10L, q1::getMessageCount, 1000, 100);
+      Wait.assertEquals(10L, q2::getMessageCount, 1000, 100);
+      Wait.assertEquals(10L, q3::getMessageCount, 1000, 100);
+
+      // consume on 3. 1, 2 counters are still at 10 and 3 is at 0.
+      try (Connection conn = factory3.createConnection()) {
+         Session session = conn.createSession();
+         conn.start();
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue(getQueueName()));
+         for (int i = 0; i < 10; i++) {
+            TextMessage message = (TextMessage) consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals("message " + i, message.getText());
+         }
+         consumer.close();
+      }
+      Wait.assertEquals(10L, q1::getMessageCount, 1000, 100);
+      Wait.assertEquals(10L, q2::getMessageCount, 1000, 100);
+      Wait.assertEquals(0L, q3::getMessageCount, 1000, 100);
+
+      // consume on 2. 1, 2 and 3 counters are back to 0
+      try (Connection conn = factory2.createConnection()) {
+         Session session = conn.createSession();
+         conn.start();
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue(getQueueName()));
+         for (int i = 0; i < 10; i++) {
+            TextMessage message = (TextMessage) consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals("message " + i, message.getText());
+         }
+         consumer.close();
+      }
+      Wait.assertEquals(0L, q1::getMessageCount, 1000, 100);
+      Wait.assertEquals(0L, q2::getMessageCount, 1000, 100);
+      Wait.assertEquals(0L, q3::getMessageCount, 1000, 100);
+
+      // produce on 3. only 3 has messages.
+      try (Connection conn = factory3.createConnection()) {
+         Session session = conn.createSession();
+         MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+         for (int i = 0; i < 10; i++) {
+            producer.send(session.createTextMessage("message " + i));
+         }
+      }
+
+      Wait.assertEquals(0L, q1::getMessageCount, 1000, 100);
+      Wait.assertEquals(0L, q2::getMessageCount, 1000, 100);
+      Wait.assertEquals(10L, q3::getMessageCount, 1000, 100);
+
+      // consume on 3. 1, 2, and 3 counters are back to 0
+      try (Connection conn = factory3.createConnection()) {
+         Session session = conn.createSession();
+         conn.start();
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue(getQueueName()));
+         for (int i = 0; i < 10; i++) {
+            TextMessage message = (TextMessage) consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals("message " + i, message.getText());
+         }
+         consumer.close();
+      }
+      Wait.assertEquals(0L, q1::getMessageCount, 1000, 100);
+      Wait.assertEquals(0L, q2::getMessageCount, 1000, 100);
+      Wait.assertEquals(0L, q3::getMessageCount, 1000, 100);
+
+      try (Connection conn = factory.createConnection()) {
+         Session session = conn.createSession();
+         conn.start();
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue(getQueueName()));
+         assertNull(consumer.receiveNoWait());
+         consumer.close();
+      }
+
+      try (Connection conn = factory2.createConnection()) {
+         Session session = conn.createSession();
+         conn.start();
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue(getQueueName()));
+         assertNull(consumer.receiveNoWait());
+         consumer.close();
+      }
+
+      try (Connection conn = factory3.createConnection()) {
+         Session session = conn.createSession();
+         conn.start();
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue(getQueueName()));
+         assertNull(consumer.receiveNoWait());
+         consumer.close();
+      }

Review Comment:
   There are a lot of repeated creating new connections to the same server. I'd 
be inclined to have a larger try-with-resources that created the connections 
and then [re]used them as needed.
   
   Not necessarily for the whole test, but where it makes sense and makes 
things succinct and more efficient without any notable change in behaviour. E.g 
the 'consume from 1, from 2, from 3....then do the same again to check there is 
still nothing' all seems like it would be functionally the same without the 
extra connection creations



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class AMQPSquareMirroringTest extends AmqpClientTestSupport {
+
+   protected static final int AMQP_PORT_2 = 5673;
+   protected static final int AMQP_PORT_3 = 5674;
+   protected static final int AMQP_PORT_4 = 5675;
+
+   ActiveMQServer server_2;
+   ActiveMQServer server_3;
+   ActiveMQServer server_4;
+
+   @Override
+   protected ActiveMQServer createServer() throws Exception {
+      return createServer(AMQP_PORT, false);
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,CORE,OPENWIRE";
+   }
+
+   @Test
+   public void testSquare() throws Exception {
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_3 = createServer(AMQP_PORT_3, false);
+      server_4 = createServer(AMQP_PORT_4, false);
+
+      // name the servers, for convenience during debugging
+      server.getConfiguration().setName("1");
+      server_2.getConfiguration().setName("2");
+      server_3.getConfiguration().setName("3");
+      server_4.getConfiguration().setName("4");
+
+      /**
+       *
+       * Setup the mirroring topology to be a square:
+       *
+       * 1 <- - -> 2
+       * ^         ^       The link between 1 and 2 and the
+       * |         |       link between 3 and 4 are noForward
+       * v         v       links in both directions.
+       * 4 <- - -> 3
+       */
+
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "1to2", "tcp://localhost:" 
+ AMQP_PORT_2).setRetryInterval(100);

Review Comment:
   It would probably be more readable with connection-specific variable names 
rather than reusing (here and other test classes). There are 8 different 
connections, with different configurations on top, all with name 
amqpConnection. There isnt even a space between the 2 in each block here.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java:
##########
@@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws 
Exception {
       }
    }
 
+   @Override
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      params.put("amqpMinLargeMessageSize", 100 * 1024);

Review Comment:
   Could be worth adding a field that can be referenced for later clarity.
   
   Though now that I look at the rest of the test, I'd note that this has no 
effect on Core messages, the Core client decides what is large/not with 
client-side config, so might be worth configuring the Core clients also.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java:
##########
@@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws 
Exception {
       }
    }
 
+   @Override
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      params.put("amqpMinLargeMessageSize", 100 * 1024);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, true);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, true);
+   }
+
+   private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean 
tunneling, boolean longMessage) throws Exception {
+      final Map<String, Object> brokerProperties = new HashMap<>();
+      brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), 
"Test-Broker");
+
+      final String[] capabilities;
+      ArrayList<String> capabilitiesList = new ArrayList<>();
+      int messageFormat = 0;
+
+      capabilitiesList.add("amq.mirror");
+      if (tunneling) {
+         
capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString());
+         messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
+      }
+      capabilities = capabilitiesList.toArray(new String[]{});
+
+      // Topology of the test: server -(noForward)-> server_2 -> peer_3

Review Comment:
   This doesnt match what happens below, server 2 has 2 mirrors.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java:
##########
@@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws 
Exception {
       }
    }
 
+   @Override
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      params.put("amqpMinLargeMessageSize", 100 * 1024);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, true);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, true);
+   }
+
+   private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean 
tunneling, boolean longMessage) throws Exception {
+      final Map<String, Object> brokerProperties = new HashMap<>();
+      brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), 
"Test-Broker");
+
+      final String[] capabilities;
+      ArrayList<String> capabilitiesList = new ArrayList<>();
+      int messageFormat = 0;
+
+      capabilitiesList.add("amq.mirror");
+      if (tunneling) {
+         
capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString());
+         messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
+      }
+      capabilities = capabilitiesList.toArray(new String[]{});
+
+      // Topology of the test: server -(noForward)-> server_2 -> peer_3
+      try (ProtonTestServer peer_3 = new ProtonTestServer()) {
+         peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS");
+         peer_3.expectOpen().respond();
+         peer_3.expectBegin().respond();
+         peer_3.expectAttach().ofSender()
+            .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR"))
+            .withDesiredCapabilities(capabilities)
+            .respond()
+            .withOfferedCapabilities(capabilities)
+            .withPropertiesMap(brokerProperties);
+         peer_3.remoteFlow().withLinkCredit(10).queue();
+         peer_3.start();
+
+         final URI remoteURI = peer_3.getServerURI();
+         logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+         final int AMQP_PORT_2 = BROKER_PORT_NUM + 1;
+         final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false);
+         {
+            AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+            amqpConnection.setReconnectAttempts(0);// No reconnects
+            amqpConnection.setUser("user");
+            amqpConnection.setPassword("pass");
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().addProperty(TUNNEL_CORE_MESSAGES, 
Boolean.toString(tunneling)).setQueueCreation(true).setAddressFilter(getQueueName()
 + ",sometest"));
+            server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+            amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", 
"tcp://localhost:" + BROKER_PORT_NUM);
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()));
+            server_2.getConfiguration().addAMQPConnection(amqpConnection);
+         }
+
+         {
+            AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", 
"tcp://localhost:" + AMQP_PORT_2).setRetryInterval(50);
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true));
+            server.getConfiguration().addAMQPConnection(amqpConnection);
+         }
+
+         // connect the topology
+         server_2.start();
+         server.start();
+         peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         // Create queues & send messages on server, nothing will reach peer_3
+         createAddressAndQueues(server);
+         Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
+         Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
+
+         final org.apache.activemq.artemis.core.server.Queue q1 = 
server.locateQueue(getQueueName());
+         assertNotNull(q1);
+
+         final org.apache.activemq.artemis.core.server.Queue q2 = 
server_2.locateQueue(getQueueName());
+         assertNotNull(q2);
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + BROKER_PORT_NUM);
+         final ConnectionFactory factory_2 = 
CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2);
+
+         String message = "A message!";
+         if (longMessage) {
+            String sLongMessage = "";
+            for (int i = 0; i < 11 * 1014; i++) {
+               sLongMessage += message;
+            }
+            message = sLongMessage;
+         }
+
+         try (Connection conn = factory.createConnection()) {
+            final Session session = conn.createSession();
+            conn.start();
+
+            final MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+            producer.send(session.createTextMessage(message));
+            producer.close();
+         }
+
+         org.apache.activemq.artemis.utils.Wait.assertEquals(1L, 
q1::getMessageCount, 100, 100);
+         org.apache.activemq.artemis.utils.Wait.assertEquals(1L, 
q2::getMessageCount, 100, 100);
+
+         try (Connection conn = factory_2.createConnection()) {
+            final Session session = conn.createSession();
+            conn.start();
+
+            final MessageConsumer consumer = 
session.createConsumer(session.createQueue(getQueueName()));
+            TextMessage rcvMsg = (TextMessage) consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals(message, rcvMsg.getText());
+            consumer.close();
+         }
+
+         org.apache.activemq.artemis.utils.Wait.assertEquals(0L, 
q2::getMessageCount, 100, 100);
+         org.apache.activemq.artemis.utils.Wait.assertEquals(0L, 
q1::getMessageCount, 6000, 100);

Review Comment:
   Ditto



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java:
##########
@@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws 
Exception {
       }
    }
 
+   @Override
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      params.put("amqpMinLargeMessageSize", 100 * 1024);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, true);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, true);
+   }
+
+   private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean 
tunneling, boolean longMessage) throws Exception {
+      final Map<String, Object> brokerProperties = new HashMap<>();
+      brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), 
"Test-Broker");
+
+      final String[] capabilities;
+      ArrayList<String> capabilitiesList = new ArrayList<>();
+      int messageFormat = 0;
+
+      capabilitiesList.add("amq.mirror");
+      if (tunneling) {
+         
capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString());
+         messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
+      }
+      capabilities = capabilitiesList.toArray(new String[]{});
+
+      // Topology of the test: server -(noForward)-> server_2 -> peer_3
+      try (ProtonTestServer peer_3 = new ProtonTestServer()) {
+         peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS");
+         peer_3.expectOpen().respond();
+         peer_3.expectBegin().respond();
+         peer_3.expectAttach().ofSender()
+            .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR"))
+            .withDesiredCapabilities(capabilities)
+            .respond()
+            .withOfferedCapabilities(capabilities)
+            .withPropertiesMap(brokerProperties);
+         peer_3.remoteFlow().withLinkCredit(10).queue();
+         peer_3.start();
+
+         final URI remoteURI = peer_3.getServerURI();
+         logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+         final int AMQP_PORT_2 = BROKER_PORT_NUM + 1;
+         final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false);
+         {
+            AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+            amqpConnection.setReconnectAttempts(0);// No reconnects
+            amqpConnection.setUser("user");
+            amqpConnection.setPassword("pass");
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().addProperty(TUNNEL_CORE_MESSAGES, 
Boolean.toString(tunneling)).setQueueCreation(true).setAddressFilter(getQueueName()
 + ",sometest"));
+            server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+            amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", 
"tcp://localhost:" + BROKER_PORT_NUM);
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()));
+            server_2.getConfiguration().addAMQPConnection(amqpConnection);
+         }
+
+         {
+            AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", 
"tcp://localhost:" + AMQP_PORT_2).setRetryInterval(50);
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true));
+            server.getConfiguration().addAMQPConnection(amqpConnection);
+         }
+
+         // connect the topology
+         server_2.start();
+         server.start();
+         peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         // Create queues & send messages on server, nothing will reach peer_3
+         createAddressAndQueues(server);
+         Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
+         Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
+
+         final org.apache.activemq.artemis.core.server.Queue q1 = 
server.locateQueue(getQueueName());
+         assertNotNull(q1);
+
+         final org.apache.activemq.artemis.core.server.Queue q2 = 
server_2.locateQueue(getQueueName());
+         assertNotNull(q2);
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + BROKER_PORT_NUM);
+         final ConnectionFactory factory_2 = 
CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2);
+
+         String message = "A message!";
+         if (longMessage) {
+            String sLongMessage = "";
+            for (int i = 0; i < 11 * 1014; i++) {
+               sLongMessage += message;
+            }
+            message = sLongMessage;
+         }
+
+         try (Connection conn = factory.createConnection()) {
+            final Session session = conn.createSession();
+            conn.start();
+
+            final MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+            producer.send(session.createTextMessage(message));
+            producer.close();
+         }
+
+         org.apache.activemq.artemis.utils.Wait.assertEquals(1L, 
q1::getMessageCount, 100, 100);
+         org.apache.activemq.artemis.utils.Wait.assertEquals(1L, 
q2::getMessageCount, 100, 100);

Review Comment:
   Doesnt seem a need for the fully qualified name, earlier in the method is 
using just Wait.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java:
##########
@@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws 
Exception {
       }
    }
 
+   @Override
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      params.put("amqpMinLargeMessageSize", 100 * 1024);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, true);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, true);
+   }
+
+   private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean 
tunneling, boolean longMessage) throws Exception {
+      final Map<String, Object> brokerProperties = new HashMap<>();
+      brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), 
"Test-Broker");
+
+      final String[] capabilities;
+      ArrayList<String> capabilitiesList = new ArrayList<>();
+      int messageFormat = 0;
+
+      capabilitiesList.add("amq.mirror");
+      if (tunneling) {
+         
capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString());
+         messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
+      }
+      capabilities = capabilitiesList.toArray(new String[]{});
+
+      // Topology of the test: server -(noForward)-> server_2 -> peer_3
+      try (ProtonTestServer peer_3 = new ProtonTestServer()) {
+         peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS");
+         peer_3.expectOpen().respond();
+         peer_3.expectBegin().respond();
+         peer_3.expectAttach().ofSender()
+            .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR"))
+            .withDesiredCapabilities(capabilities)
+            .respond()
+            .withOfferedCapabilities(capabilities)
+            .withPropertiesMap(brokerProperties);
+         peer_3.remoteFlow().withLinkCredit(10).queue();
+         peer_3.start();
+
+         final URI remoteURI = peer_3.getServerURI();
+         logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+         final int AMQP_PORT_2 = BROKER_PORT_NUM + 1;
+         final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false);
+         {
+            AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+            amqpConnection.setReconnectAttempts(0);// No reconnects
+            amqpConnection.setUser("user");
+            amqpConnection.setPassword("pass");
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().addProperty(TUNNEL_CORE_MESSAGES, 
Boolean.toString(tunneling)).setQueueCreation(true).setAddressFilter(getQueueName()
 + ",sometest"));
+            server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+            amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", 
"tcp://localhost:" + BROKER_PORT_NUM);
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()));
+            server_2.getConfiguration().addAMQPConnection(amqpConnection);
+         }
+
+         {
+            AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", 
"tcp://localhost:" + AMQP_PORT_2).setRetryInterval(50);
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true));
+            server.getConfiguration().addAMQPConnection(amqpConnection);
+         }
+
+         // connect the topology
+         server_2.start();
+         server.start();
+         peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         // Create queues & send messages on server, nothing will reach peer_3
+         createAddressAndQueues(server);
+         Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
+         Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
+
+         final org.apache.activemq.artemis.core.server.Queue q1 = 
server.locateQueue(getQueueName());
+         assertNotNull(q1);
+
+         final org.apache.activemq.artemis.core.server.Queue q2 = 
server_2.locateQueue(getQueueName());
+         assertNotNull(q2);
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + BROKER_PORT_NUM);
+         final ConnectionFactory factory_2 = 
CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2);

Review Comment:
   Probably worth renaming the test to make clear it is Core large messages 
being sent originally. Equally, if controlling the AMQP large message size 
(which here, will only matter for the non-tunnelled case as mirrored messages 
arrive at server 2 via server 1) then it probably makes sense to configure the 
Core client large message size (via its URI, e.g add 
"?minLargeMessageSize=\<bytes-size\>"
   
   Probably worth having tests with large AMQP messages also. 



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java:
##########
@@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws 
Exception {
       }
    }
 
+   @Override
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      params.put("amqpMinLargeMessageSize", 100 * 1024);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, true);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, true);
+   }
+
+   private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean 
tunneling, boolean longMessage) throws Exception {
+      final Map<String, Object> brokerProperties = new HashMap<>();
+      brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), 
"Test-Broker");
+
+      final String[] capabilities;
+      ArrayList<String> capabilitiesList = new ArrayList<>();
+      int messageFormat = 0;
+
+      capabilitiesList.add("amq.mirror");
+      if (tunneling) {
+         
capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString());
+         messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
+      }
+      capabilities = capabilitiesList.toArray(new String[]{});
+
+      // Topology of the test: server -(noForward)-> server_2 -> peer_3
+      try (ProtonTestServer peer_3 = new ProtonTestServer()) {
+         peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS");
+         peer_3.expectOpen().respond();
+         peer_3.expectBegin().respond();
+         peer_3.expectAttach().ofSender()
+            .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR"))
+            .withDesiredCapabilities(capabilities)
+            .respond()
+            .withOfferedCapabilities(capabilities)
+            .withPropertiesMap(brokerProperties);
+         peer_3.remoteFlow().withLinkCredit(10).queue();
+         peer_3.start();
+
+         final URI remoteURI = peer_3.getServerURI();
+         logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+         final int AMQP_PORT_2 = BROKER_PORT_NUM + 1;
+         final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false);
+         {
+            AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+            amqpConnection.setReconnectAttempts(0);// No reconnects
+            amqpConnection.setUser("user");
+            amqpConnection.setPassword("pass");
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().addProperty(TUNNEL_CORE_MESSAGES, 
Boolean.toString(tunneling)).setQueueCreation(true).setAddressFilter(getQueueName()
 + ",sometest"));
+            server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+            amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", 
"tcp://localhost:" + BROKER_PORT_NUM);
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()));
+            server_2.getConfiguration().addAMQPConnection(amqpConnection);
+         }
+
+         {
+            AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", 
"tcp://localhost:" + AMQP_PORT_2).setRetryInterval(50);
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true));
+            server.getConfiguration().addAMQPConnection(amqpConnection);
+         }
+
+         // connect the topology
+         server_2.start();
+         server.start();
+         peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         // Create queues & send messages on server, nothing will reach peer_3
+         createAddressAndQueues(server);
+         Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
+         Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
+
+         final org.apache.activemq.artemis.core.server.Queue q1 = 
server.locateQueue(getQueueName());
+         assertNotNull(q1);
+
+         final org.apache.activemq.artemis.core.server.Queue q2 = 
server_2.locateQueue(getQueueName());
+         assertNotNull(q2);
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + BROKER_PORT_NUM);
+         final ConnectionFactory factory_2 = 
CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2);
+
+         String message = "A message!";
+         if (longMessage) {
+            String sLongMessage = "";
+            for (int i = 0; i < 11 * 1014; i++) {
+               sLongMessage += message;
+            }
+            message = sLongMessage;
+         }
+
+         try (Connection conn = factory.createConnection()) {
+            final Session session = conn.createSession();
+            conn.start();
+
+            final MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+            producer.send(session.createTextMessage(message));
+            producer.close();
+         }
+
+         org.apache.activemq.artemis.utils.Wait.assertEquals(1L, 
q1::getMessageCount, 100, 100);
+         org.apache.activemq.artemis.utils.Wait.assertEquals(1L, 
q2::getMessageCount, 100, 100);
+
+         try (Connection conn = factory_2.createConnection()) {
+            final Session session = conn.createSession();
+            conn.start();
+
+            final MessageConsumer consumer = 
session.createConsumer(session.createQueue(getQueueName()));
+            TextMessage rcvMsg = (TextMessage) consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals(message, rcvMsg.getText());
+            consumer.close();
+         }
+
+         org.apache.activemq.artemis.utils.Wait.assertEquals(0L, 
q2::getMessageCount, 100, 100);
+         org.apache.activemq.artemis.utils.Wait.assertEquals(0L, 
q1::getMessageCount, 6000, 100);
+
+         // give some time to peer_3 to receive messages (if any)
+         Thread.sleep(100);
+         peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); // if messages 
are received here, this should error out
+
+         // Then send messages on the broker directly connected to the peer, 
the messages should make it to the peer.
+         // Receiving these 3 messages in that order confirms that no previous 
data reched the Peer, therefore validating
+         // the test.
+         peer_3.expectTransfer().accept(); // Address create
+         peer_3.expectTransfer().accept(); // Queue create
+         peer_3.expectTransfer().withMessageFormat(messageFormat).accept(); // 
Producer Message
+
+         server_2.addAddressInfo(new AddressInfo(SimpleString.of("sometest"), 
RoutingType.ANYCAST));
+         
server_2.createQueue(QueueConfiguration.of("sometest").setRoutingType(RoutingType.ANYCAST));
+
+         try (Connection connection = factory_2.createConnection()) {
+            final Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+            final MessageProducer producer = 
session.createProducer(session.createQueue("sometest"));
+            final TextMessage msg = session.createTextMessage("test");
+
+            connection.start();
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);

Review Comment:
   This is the default, can be dropped for conciseness. Dont actually need to 
start a producer-only connection either.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPNoForwardMirroringTest.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class AMQPNoForwardMirroringTest extends AmqpClientTestSupport {
+
+   protected static final int AMQP_PORT_2 = 5673;
+   protected static final int AMQP_PORT_3 = 5674;
+
+   ActiveMQServer server_2;
+   ActiveMQServer server_3;
+
+   @Override
+   protected ActiveMQServer createServer() throws Exception {
+      return createServer(AMQP_PORT, false);
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,CORE,OPENWIRE";
+   }
+
+   @Test
+   public void testNoForward() throws Exception {
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_3 = createServer(AMQP_PORT_3, false);
+
+      // name the servers, for convenience during debugging
+      server.getConfiguration().setName("1");
+      server_2.getConfiguration().setName("2");
+      server_3.getConfiguration().setName("3");
+
+      /**
+       *
+       * the mirroring topology:
+       *
+       * v---------|
+       * 1 ------> 2       The link between 1 and 2 is noForward=true

Review Comment:
   Adding another linebetween these, to allow a more obvious [downward] side 
here would probably make this clearer. You could also make it wider and embed 
the (noForward) in the there like the other test class comment.
   
   Its also inconsistent with its use of line types denoting the noForward 
config in the other later test class, would be good to make them the classes 
illustrate things the same way for clarity.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java:
##########
@@ -229,6 +271,173 @@ public void testBrokerAddsAddressAndQueue() throws 
Exception {
       }
    }
 
+   @Override
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      params.put("amqpMinLargeMessageSize", 100 * 1024);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, false);
+   }
+
+   @Test
+   @Timeout(20)
+   public void testNoForwardBlocksLargeMessagesAndControlsPropagation() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(false, true);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testNoForwardBlocksLargeMessagesAndControlsPropagationWithTunneling() throws 
Exception {
+      doTestNoForwardBlocksMessagesAndControlsPropagation(true, true);
+   }
+
+   private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean 
tunneling, boolean longMessage) throws Exception {
+      final Map<String, Object> brokerProperties = new HashMap<>();
+      brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), 
"Test-Broker");
+
+      final String[] capabilities;
+      ArrayList<String> capabilitiesList = new ArrayList<>();
+      int messageFormat = 0;
+
+      capabilitiesList.add("amq.mirror");
+      if (tunneling) {
+         
capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString());
+         messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
+      }
+      capabilities = capabilitiesList.toArray(new String[]{});
+
+      // Topology of the test: server -(noForward)-> server_2 -> peer_3
+      try (ProtonTestServer peer_3 = new ProtonTestServer()) {
+         peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS");
+         peer_3.expectOpen().respond();
+         peer_3.expectBegin().respond();
+         peer_3.expectAttach().ofSender()
+            .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR"))
+            .withDesiredCapabilities(capabilities)
+            .respond()
+            .withOfferedCapabilities(capabilities)
+            .withPropertiesMap(brokerProperties);
+         peer_3.remoteFlow().withLinkCredit(10).queue();
+         peer_3.start();
+
+         final URI remoteURI = peer_3.getServerURI();
+         logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+         final int AMQP_PORT_2 = BROKER_PORT_NUM + 1;
+         final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false);
+         {
+            AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+            amqpConnection.setReconnectAttempts(0);// No reconnects
+            amqpConnection.setUser("user");
+            amqpConnection.setPassword("pass");
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().addProperty(TUNNEL_CORE_MESSAGES, 
Boolean.toString(tunneling)).setQueueCreation(true).setAddressFilter(getQueueName()
 + ",sometest"));
+            server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+            amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", 
"tcp://localhost:" + BROKER_PORT_NUM);
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()));
+            server_2.getConfiguration().addAMQPConnection(amqpConnection);
+         }
+
+         {
+            AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", 
"tcp://localhost:" + AMQP_PORT_2).setRetryInterval(50);
+            amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true));
+            server.getConfiguration().addAMQPConnection(amqpConnection);
+         }
+
+         // connect the topology
+         server_2.start();
+         server.start();
+         peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         // Create queues & send messages on server, nothing will reach peer_3
+         createAddressAndQueues(server);
+         Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
+         Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
+
+         final org.apache.activemq.artemis.core.server.Queue q1 = 
server.locateQueue(getQueueName());
+         assertNotNull(q1);
+
+         final org.apache.activemq.artemis.core.server.Queue q2 = 
server_2.locateQueue(getQueueName());
+         assertNotNull(q2);
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + BROKER_PORT_NUM);
+         final ConnectionFactory factory_2 = 
CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2);
+
+         String message = "A message!";
+         if (longMessage) {
+            String sLongMessage = "";
+            for (int i = 0; i < 11 * 1014; i++) {
+               sLongMessage += message;
+            }
+            message = sLongMessage;
+         }
+
+         try (Connection conn = factory.createConnection()) {
+            final Session session = conn.createSession();
+            conn.start();
+
+            final MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
+            producer.send(session.createTextMessage(message));
+            producer.close();
+         }
+
+         org.apache.activemq.artemis.utils.Wait.assertEquals(1L, 
q1::getMessageCount, 100, 100);
+         org.apache.activemq.artemis.utils.Wait.assertEquals(1L, 
q2::getMessageCount, 100, 100);
+
+         try (Connection conn = factory_2.createConnection()) {
+            final Session session = conn.createSession();
+            conn.start();
+
+            final MessageConsumer consumer = 
session.createConsumer(session.createQueue(getQueueName()));
+            TextMessage rcvMsg = (TextMessage) consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals(message, rcvMsg.getText());
+            consumer.close();
+         }
+
+         org.apache.activemq.artemis.utils.Wait.assertEquals(0L, 
q2::getMessageCount, 100, 100);
+         org.apache.activemq.artemis.utils.Wait.assertEquals(0L, 
q1::getMessageCount, 6000, 100);
+
+         // give some time to peer_3 to receive messages (if any)
+         Thread.sleep(100);
+         peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); // if messages 
are received here, this should error out
+
+         // Then send messages on the broker directly connected to the peer, 
the messages should make it to the peer.
+         // Receiving these 3 messages in that order confirms that no previous 
data reched the Peer, therefore validating
+         // the test.
+         peer_3.expectTransfer().accept(); // Address create
+         peer_3.expectTransfer().accept(); // Queue create
+         peer_3.expectTransfer().withMessageFormat(messageFormat).accept(); // 
Producer Message
+
+         server_2.addAddressInfo(new AddressInfo(SimpleString.of("sometest"), 
RoutingType.ANYCAST));
+         
server_2.createQueue(QueueConfiguration.of("sometest").setRoutingType(RoutingType.ANYCAST));

Review Comment:
   could probably use something more descriptive than "sometest". Either way, a 
variable would probably make it easier to follow where it is used, e.g in the 
mirror config for filtering, and again here/below.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class AMQPSquareMirroringTest extends AmqpClientTestSupport {
+
+   protected static final int AMQP_PORT_2 = 5673;
+   protected static final int AMQP_PORT_3 = 5674;
+   protected static final int AMQP_PORT_4 = 5675;
+
+   ActiveMQServer server_2;
+   ActiveMQServer server_3;
+   ActiveMQServer server_4;
+
+   @Override
+   protected ActiveMQServer createServer() throws Exception {
+      return createServer(AMQP_PORT, false);
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,CORE,OPENWIRE";
+   }
+
+   @Test
+   public void testSquare() throws Exception {
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_3 = createServer(AMQP_PORT_3, false);
+      server_4 = createServer(AMQP_PORT_4, false);
+
+      // name the servers, for convenience during debugging
+      server.getConfiguration().setName("1");
+      server_2.getConfiguration().setName("2");
+      server_3.getConfiguration().setName("3");
+      server_4.getConfiguration().setName("4");
+
+      /**
+       *
+       * Setup the mirroring topology to be a square:
+       *
+       * 1 <- - -> 2
+       * ^         ^       The link between 1 and 2 and the
+       * |         |       link between 3 and 4 are noForward
+       * v         v       links in both directions.
+       * 4 <- - -> 3
+       */
+
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "1to2", "tcp://localhost:" 
+ AMQP_PORT_2).setRetryInterval(100);
+         amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setNoForward(true));
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "1to4", "tcp://localhost:" 
+ AMQP_PORT_4).setRetryInterval(100);

Review Comment:
   Similarly, should gate retries.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPNoForwardMirroringTest.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class AMQPNoForwardMirroringTest extends AmqpClientTestSupport {
+
+   protected static final int AMQP_PORT_2 = 5673;
+   protected static final int AMQP_PORT_3 = 5674;
+
+   ActiveMQServer server_2;
+   ActiveMQServer server_3;
+
+   @Override
+   protected ActiveMQServer createServer() throws Exception {
+      return createServer(AMQP_PORT, false);
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,CORE,OPENWIRE";
+   }
+
+   @Test
+   public void testNoForward() throws Exception {
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_3 = createServer(AMQP_PORT_3, false);
+
+      // name the servers, for convenience during debugging
+      server.getConfiguration().setName("1");
+      server_2.getConfiguration().setName("2");
+      server_3.getConfiguration().setName("3");
+
+      /**
+       *
+       * the mirroring topology:
+       *
+       * v---------|
+       * 1 ------> 2       The link between 1 and 2 is noForward=true
+       *           ^
+       *           v
+       *           3
+       */
+
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(getTestMethodName() + "1to2", "tcp://localhost:" 
+ AMQP_PORT_2).setRetryInterval(100);

Review Comment:
   Should add a gate on the retries (here and below).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org
For additional commands, e-mail: gitbox-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact



Reply via email to