[ 
https://issues.apache.org/jira/browse/ARTEMIS-3243?focusedWorklogId=616597&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-616597
 ]

ASF GitHub Bot logged work on ARTEMIS-3243:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Jun/21 16:35
            Start Date: 29/Jun/21 16:35
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on a change in pull request #3633:
URL: https://github.com/apache/activemq-artemis/pull/3633#discussion_r659722086



##########
File path: 
artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
##########
@@ -127,6 +127,10 @@
    @Option(name = "--relax-jolokia", description = "disable strict checking on 
jolokia-access.xml")
    private boolean relaxJolokia;
 
+
+   @Option(name = "--cloud", description = "This is an instance that will be 
shared by a cloud folder")

Review comment:
       This feels like it should be in its own PR, seems like a distinct 
feature, just one possibly used [in a few tests?] here. 
   
   Not sure 'cloud' is a great name for this, not seeing anything particularly 
cloudy with it? Appears like it maybe just forces particular (undocumented) 
instance dirs, perhaps as a type of sharing? Is this really needed?

##########
File path: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/Pool.java
##########
@@ -61,6 +69,10 @@ public final T borrow() {
    public final void release(T object) {
       if (internalPool != null) {
          internalPool.offer(object);
+      } else {
+         if (logger.isTraceEnabled()) {
+            logger.trace("internalPool was empty");

Review comment:
       was null?

##########
File path: artemis-server/src/main/resources/schema/artemis-configuration.xsd
##########
@@ -2185,11 +2193,11 @@
             </xsd:documentation>
          </xsd:annotation>
       </xsd:attribute>
-      <xsd:attribute name="source-mirror-address" type="xsd:string" 
use="optional" default="">
+      <xsd:attribute name="durable" type="xsd:boolean" use="optional" 
default="true">

Review comment:
       Does this mean its no longer configurable? If so, what do existing folks 
that previously configured it do?

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an 
`<amqp-connection>` element within t
 
 *Notice:* If you disable auto-start on the broker connection, the start of the 
broker connection will only happen after the management method 
`startBrokerConnection(connectionName)` is called on the ServerController.
 
-*Important*: The target endpoint needs permission for all operations that you 
configure. Therefore, If you are using a security manager, ensure that you 
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs 
permission for all operations that you configure. Therefore, If you are using a 
security manager, ensure that you perform the configured operations as a user 
with sufficient permissions.
 
 # AMQP Server Connection Operations
 The following types of operations are supported on a AMQP server connection:
 
+* Mirrors
+    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
 * Senders
     * Messages received on specific queues are transferred to another endpoint
 * Receivers
     * The broker pulls messages from another endpoint
 * Peers
     * The broker creates both senders and receivers on another endpoint that 
knows how to handle them. Currently, this is implemented by Apache Qpid 
Dispatch.
-* Mirrors
-    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
 
+## Mirrors
+The mirror option on the broker connection can capture events from the broker 
and pass them over the wire to another broker. This enables you to capture 
multiple asynchronous replicas. The following types of events are captured:
+
+* Message routing
+* Message acknowledgement
+* Queue and address creation
+* queue and address deletion
+
+When you configure a mirror, these events are captured from the broker, stored 
on a local queue, and later forwarded to a target destination on another 
ActiveMQ Artemis broker.
+
+To configure a mirror, you add a `<mirror>` element within the 
`<amqp-connection>` element.
+
+The local queue is called `source-mirror-address`
+
+You can specify the following optional arguments.
+
+* `queue-removal`: Specifies whether a queue- or address-removal event is 
sent. The default value is `true`.
+* `message-acknowledgements`: Specifies whether message acknowledgements are 
sent. The default value is `true`.
+* `queue-creation`: Specifies whether a queue- or address-creation event is 
sent. The default value is `true`.
+* `source-mirror-address`: By default, the mirror creates a non-durable 
temporary queue to store messages before they are sent to the other broker. If 
you define a name value for this property, an ANYCAST durable queue and address 
is created with the specified name.

Review comment:
       Is the queue still named after source-mirror-address and configurable? 
The XSD bits for that option seemed to be removed and a 'durable' option added? 
The latter doesnt seem to be covered here.

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
##########
@@ -162,6 +162,7 @@ protected ActiveMQServer createServer(int port, boolean 
start) throws Exception
 
       final ActiveMQServer server = this.createServer(true, true);
 
+      server.getConfiguration().setBrokerMirrorId((short)1);

Review comment:
       Feels like tests that need this should either override the creation 
process, or an explicit method, to toggle this rather than doing it for all 
tests even if they arent testing this. E.g perhaps via 
addAcceptorConfiguration, which actually shows up lower in the context of this 
diff.

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
##########
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.PrintStream;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.cli.commands.tools.PrintData;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.StringPrintStream;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.jboss.logging.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BrokerInSyncTest extends AmqpClientTestSupport {
+
+   public static final int TIME_BEFORE_RESTART = 1000;
+   protected static final int AMQP_PORT_2 = 5673;
+   protected static final int AMQP_PORT_3 = 5674;
+   private static final Logger logger = 
Logger.getLogger(BrokerInSyncTest.class);
+   ActiveMQServer server_2;
+
+   @Before
+   public void startLogging() {
+      AssertionLoggerHandler.startCapture();
+   }
+
+   @After
+   public void stopLogging() {
+      try {
+         Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
+      } finally {
+         AssertionLoggerHandler.stopCapture();
+      }
+   }
+
+   @Override
+   protected ActiveMQServer createServer() throws Exception {
+      return createServer(AMQP_PORT, false);
+   }
+
+   @Test
+   public void testSyncOnCreateQueues() throws Exception {
+      server.setIdentity("Server1");
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+         amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+      }
+      server.start();
+
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("Server2");
+
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+         amqpConnection.addElement(new 
AMQPMirrorBrokerConnectionElement().setDurable(true));
+         server_2.getConfiguration().addAMQPConnection(amqpConnection);
+      }
+
+      server_2.start();
+
+      server_2.addAddressInfo(new 
AddressInfo("sometest").setAutoCreated(false));
+      server_2.createQueue(new 
QueueConfiguration("sometest").setDurable(true));
+
+      Wait.assertTrue(() -> server_2.locateQueue("sometest") != null);
+      Wait.assertTrue(() -> server.locateQueue("sometest") != null);
+
+      server.addAddressInfo(new 
AddressInfo("OnServer1").setAutoCreated(false));
+      server.createQueue(new QueueConfiguration("OnServer1").setDurable(true));
+
+      Wait.assertTrue(() -> server.locateQueue("OnServer1") != null);
+      Wait.assertTrue("Sync is not working on the way back", () -> 
server_2.locateQueue("OnServer1") != null, 2000);
+
+      Wait.assertTrue(() -> server_2.locateQueue("sometest") != null);
+      Wait.assertTrue(() -> server.locateQueue("sometest") != null);
+
+      for (int i = 0; i < 10; i++) {
+         final int queueID = i;
+         server_2.createQueue(new QueueConfiguration("test2_" + 
i).setDurable(true));
+         server.createQueue(new QueueConfiguration("test1_" + 
i).setDurable(true));
+         Wait.assertTrue(() -> server.locateQueue("test2_" + queueID) != null);
+         Wait.assertTrue(() -> server.locateQueue("test1_" + queueID) != null);
+         Wait.assertTrue(() -> server_2.locateQueue("test2_" + queueID) != 
null);
+         Wait.assertTrue(() -> server_2.locateQueue("test1_" + queueID) != 
null);
+      }
+
+      server_2.stop();
+      server.stop();
+   }
+
+   @Test
+   public void testSyncData() throws Exception {
+      int NUMBER_OF_MESSAGES = 100;
+      server.setIdentity("Server1");
+      server.getConfiguration().setBrokerMirrorId((short) 1);

Review comment:
       Quite a lot of casts remaining after adding the int method to avoid 
casting (which this is presumably using, just after expansion).

##########
File path: 
artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
##########
@@ -39,6 +39,49 @@
 
    private static Logger log = Logger.getLogger(ByteUtilTest.class);
 
+
+   @Test
+   public void testMixIDs1() {
+      long mixedID = ByteUtil.mixByteAndLong((byte)3, -1L);  // just a note, 
-1 is 0Xffffff....
+      Assert.assertEquals((byte)3, ByteUtil.getFirstByte(mixedID));
+
+      long mixedIDWithMaxValue = ByteUtil.mixByteAndLong((byte)3, 
Long.MAX_VALUE);
+      // first byte is 03, all the rest is fff...
+      Assert.assertEquals(0x03ffffffffffffffL, mixedIDWithMaxValue);
+      // -1 and fff should return you the same value when mixing 03, I'm doing 
this check just in case.
+      Assert.assertEquals(mixedIDWithMaxValue, mixedID);

Review comment:
       Two totally different 'mixes' resulting in the same value is a good 
demonstration why I think this 'ID munging' is a bad idea.

##########
File path: 
artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
##########
@@ -39,6 +39,49 @@
 
    private static Logger log = Logger.getLogger(ByteUtilTest.class);
 
+
+   @Test
+   public void testMixIDs1() {
+      long mixedID = ByteUtil.mixByteAndLong((byte)3, -1L);  // just a note, 
-1 is 0Xffffff....
+      Assert.assertEquals((byte)3, ByteUtil.getFirstByte(mixedID));
+
+      long mixedIDWithMaxValue = ByteUtil.mixByteAndLong((byte)3, 
Long.MAX_VALUE);
+      // first byte is 03, all the rest is fff...
+      Assert.assertEquals(0x03ffffffffffffffL, mixedIDWithMaxValue);
+      // -1 and fff should return you the same value when mixing 03, I'm doing 
this check just in case.
+      Assert.assertEquals(mixedIDWithMaxValue, mixedID);
+   }
+
+   @Test
+   public void testUnsalt() {

Review comment:
       All in all this seems a long way of testing 
ByteUtil.removeFirstByte(long) zeros the first byte. Feeding it a couple of 
corner cases like -1 etc and the known results would seem simpler.
   
   EDIT: If there is a 'testUnsalt' I might expect a 'testSalt'. This seems to 
be doing both so could at least be renamed. I'd suggest separating into two 
simpler tests. Actually, thats kind of what 'testMixIDs1' is doing.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
##########
@@ -490,12 +506,18 @@ private void connectSender(Queue queue,
             Source source = new Source();
             source.setAddress(queue.getAddress().toString());
             sender.setSource(source);
+            HashMap<Symbol, Object> mapProperties = new HashMap<>(1, 1); // 
this map is expected to have a single element, so load factor = 1
+            mapProperties.put(AMQPMirrorControllerSource.BROKER_ID, 
server.getMirrorBrokerId());
+            sender.setProperties(mapProperties);

Review comment:
       This method is also used for creating non-mirror senders. I don't think 
it should be adding the mirror ID on non-mirror links in those cases.
   
   This also goes to another comment elsewhere - defining a mirror-id appears 
to have been made mandatory, even if you aren't using mirroring which seems 
quite annoying. Maybe this is why?

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
##########
@@ -324,13 +337,13 @@ private static void 
uninstallMirrorController(AMQPMirrorBrokerConnectionElement
     *  It is returning the snfQueue to the replica, and I needed isolation 
from the actual instance.
     *  During development I had a mistake where I used a property from the 
Object,
     *  so, I needed this isolation for my organization and making sure nothing 
would be shared. */
-   private static Queue installMirrorController(AMQPBrokerConnection 
brokerConnection, AMQPMirrorBrokerConnectionElement replicaConfig, 
ActiveMQServer server) throws Exception {
+   private Queue installMirrorController(AMQPMirrorBrokerConnectionElement 
replicaConfig, ActiveMQServer server) throws Exception {

Review comment:
       The comment for the method seems to be describing why this method is 
static, which it isnt after this change, so the comment needs removed or 
updated.

##########
File path: 
artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
##########
@@ -874,6 +868,36 @@ public Object run(ActionContext context) throws Exception {
       return null;
    }
 
+   private void generateETCParameters(HashMap<String, String> filters,
+                          File etcFolder,
+                          File oomeDumpFile,
+                          File dataFolder) throws IOException {
+
+
+
+      if (cloud) {
+         filters.put("${artemis.home}", "/opt/activemq-artemis");
+         filters.put("${artemis.instance}", "/var/lib/artemis-instance");
+         filters.put("${artemis.instance.data}", 
"/var/lib/artemis-instance/data");
+         filters.put("${artemis.instance.etc}", 
"/var/lib/artemis-instance/etc");
+         filters.put("${artemis.instance.etc.uri}", 
"file:/var/lib/artemis-instance/./etc/");
+         filters.put("${artemis.instance.uri}", 
"file:/var/lib/artemis-instance/./");
+         filters.put("${artemis.instance.oome.dump}", 
"/var/lib/artemis-instance/log/oom_dump.hprof");

Review comment:
       Seems a little presumptive that it would be assumed to be in /opt, /var 
etc. Also these dont seem to be documented anywhere. Unclear why are there many 
fewer things set here? Windows users not allowed to use '--cloud' (restriction 
also undocumented)?

##########
File path: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/MpscPool.java
##########
@@ -33,12 +33,17 @@
  */
 public class MpscPool<T> extends Pool<T> {
 
+   private static final Logger logger = Logger.getLogger(MpscPool.class);
+
    public MpscPool(int maxSize, Consumer<T> cleaner, Supplier<T> supplier) {
       super(maxSize, cleaner, supplier);
    }
 
    @Override
    protected Queue<T> createQueue(int maxSize) {
+      if (logger.isTraceEnabled()) {
+         logger.trace("Creating Pool with size=" + maxSize + " for classes for 
" + supplier.get().getClass());

Review comment:
       Burning a value just to log its type, and exposing the supplier field to 
enable doing it, seems a bit ugly...is the type really needed here? The parent 
class looks to log the type when it is returning instances anyway.

##########
File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
##########
@@ -598,10 +598,29 @@ public void parseMainConfig(final Element e, final 
Configuration config) throws
          parseClusterConnectionConfigurationURI(ccNode, config);
       }
 
+/*      Integer mirrorBrokerId = getInteger(e, "broker-mirror-id", 0, (name, 
value) -> {
+         if (value != null) {
+            int valueInteger = (int) value;
+            if (valueInteger < 0 || valueInteger > 255) {
+               throw ActiveMQMessageBundle.BUNDLE.invalidBrokerID(value);
+            }
+         }
+      });
+
+      config.setBrokerMirrorId(mirrorBrokerId.shortValue());
+*/
 
       NodeList ccAMQPConnections = 
e.getElementsByTagName("broker-connections");
+      System.out.println("Connections length = " + 
ccAMQPConnections.getLength());

Review comment:
       Leftover debug System.out.println

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an 
`<amqp-connection>` element within t
 
 *Notice:* If you disable auto-start on the broker connection, the start of the 
broker connection will only happen after the management method 
`startBrokerConnection(connectionName)` is called on the ServerController.
 
-*Important*: The target endpoint needs permission for all operations that you 
configure. Therefore, If you are using a security manager, ensure that you 
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs 
permission for all operations that you configure. Therefore, If you are using a 
security manager, ensure that you perform the configured operations as a user 
with sufficient permissions.

Review comment:
       Its not clear what 'matching mirror ID' means here, which seems odd for 
something labelled important, as it hasnt been outlined at all other than to 
say each broker requires a unique mirror ID...matching what exactly?
   
   (As earlier, I think this ID should be optional or elsewhere)

##########
File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
##########
@@ -598,10 +598,29 @@ public void parseMainConfig(final Element e, final 
Configuration config) throws
          parseClusterConnectionConfigurationURI(ccNode, config);
       }
 
+/*      Integer mirrorBrokerId = getInteger(e, "broker-mirror-id", 0, (name, 
value) -> {

Review comment:
       Commented out code leftover

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -7,21 +7,24 @@ Currently, this feature supports only the AMQP protocol. 
However, in the future,
 You configure broker connections using a `<broker-connections>` XML element in 
the `broker.xml` configuration file.
 
 ```xml
-<broker-connections>
+<broker-connections mirror-id="1">
     ...
 </broker-connections>
 ```
 
+Each broker requires a unique id, defined in the `<broker-connections>` XML 
element as `mirror-id`.
+
 # AMQP Server Connections
 
 An ActiveMQ Artemis broker can initiate connections using the AMQP protocol. 
This means that the broker can connect to another AMQP server (not necessarily 
ActiveMQ Artemis) and create elements on that connection.
 
 To define an AMQP broker connection, add an `<amqp-connection>` element within 
the `<broker-connections` element in the `broker.xml` configuration file. For 
example:
 
 ```xml
-<broker-connections>
+<broker-connections source-mirror-id="1">
     <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker" 
retry-interval="100" reconnect-attempts="-1" user="john" password="doe">
          ...
+    <mirror/>

Review comment:
       Wrong indentation.
   
   Also more importantly, the "..." was used to signify a placeholder for 'some 
content', it was left otherwise empty last time round as this bit was only 
covering the connection config only rather than the children types, none of 
which have been covered yet, but each having their own sections immediately 
following.
   
   I would remove this and leave the placeholder nature of the "...", if needed 
make it more obvious with say "...senders/receivers/mirrors...".

##########
File path: artemis-server/src/main/resources/schema/artemis-configuration.xsd
##########
@@ -2073,9 +2074,16 @@
    </xsd:complexType>
 
    <xsd:complexType name="brokerConnectType">
-      <xsd:sequence maxOccurs="unbounded">
+      <xsd:sequence maxOccurs="unbounded" minOccurs="0">
         <xsd:element name="amqp-connection" type="amqp-connectionUriType"/>
       </xsd:sequence>
+      <xsd:attribute name="mirror-id" type="xsd:unsignedByte" use="required">
+         <xsd:annotation>
+            <xsd:documentation>
+               When you connect mirrors towards this broker, you need a 
uniqute byte (less than 255, more than 0) to identify your broker.
+            </xsd:documentation>
+         </xsd:annotation>
+      </xsd:attribute>

Review comment:
       This feels a bit ugly, you always have to provide a mirror id here on 
'broker conections', even if you aren't doing mirroring? Feels like it should 
either be optional, and only enforced present if mirroring is then used, or be 
elsewhere such as down a level on a mirroring section. Or not be called a 
'mirror-id'.
   
   Typo in "uniqute".
   "1 to 255" might be simpler than "(less than 255, more than 0)"

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an 
`<amqp-connection>` element within t
 
 *Notice:* If you disable auto-start on the broker connection, the start of the 
broker connection will only happen after the management method 
`startBrokerConnection(connectionName)` is called on the ServerController.
 
-*Important*: The target endpoint needs permission for all operations that you 
configure. Therefore, If you are using a security manager, ensure that you 
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs 
permission for all operations that you configure. Therefore, If you are using a 
security manager, ensure that you perform the configured operations as a user 
with sufficient permissions.
 
 # AMQP Server Connection Operations
 The following types of operations are supported on a AMQP server connection:
 
+* Mirrors
+    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
 * Senders
     * Messages received on specific queues are transferred to another endpoint
 * Receivers
     * The broker pulls messages from another endpoint
 * Peers
     * The broker creates both senders and receivers on another endpoint that 
knows how to handle them. Currently, this is implemented by Apache Qpid 
Dispatch.
-* Mirrors
-    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
 
+## Mirrors
+The mirror option on the broker connection can capture events from the broker 
and pass them over the wire to another broker. This enables you to capture 
multiple asynchronous replicas. The following types of events are captured:
+
+* Message routing
+* Message acknowledgement
+* Queue and address creation
+* queue and address deletion

Review comment:
       Inconsistent capitalisation

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an 
`<amqp-connection>` element within t
 
 *Notice:* If you disable auto-start on the broker connection, the start of the 
broker connection will only happen after the management method 
`startBrokerConnection(connectionName)` is called on the ServerController.
 
-*Important*: The target endpoint needs permission for all operations that you 
configure. Therefore, If you are using a security manager, ensure that you 
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs 
permission for all operations that you configure. Therefore, If you are using a 
security manager, ensure that you perform the configured operations as a user 
with sufficient permissions.
 
 # AMQP Server Connection Operations
 The following types of operations are supported on a AMQP server connection:
 
+* Mirrors
+    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
 * Senders
     * Messages received on specific queues are transferred to another endpoint
 * Receivers
     * The broker pulls messages from another endpoint
 * Peers
     * The broker creates both senders and receivers on another endpoint that 
knows how to handle them. Currently, this is implemented by Apache Qpid 
Dispatch.
-* Mirrors
-    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
 
+## Mirrors
+The mirror option on the broker connection can capture events from the broker 
and pass them over the wire to another broker. This enables you to capture 
multiple asynchronous replicas. The following types of events are captured:
+
+* Message routing
+* Message acknowledgement
+* Queue and address creation
+* queue and address deletion
+
+When you configure a mirror, these events are captured from the broker, stored 
on a local queue, and later forwarded to a target destination on another 
ActiveMQ Artemis broker.
+
+To configure a mirror, you add a `<mirror>` element within the 
`<amqp-connection>` element.
+
+The local queue is called `source-mirror-address`
+
+You can specify the following optional arguments.
+
+* `queue-removal`: Specifies whether a queue- or address-removal event is 
sent. The default value is `true`.
+* `message-acknowledgements`: Specifies whether message acknowledgements are 
sent. The default value is `true`.
+* `queue-creation`: Specifies whether a queue- or address-creation event is 
sent. The default value is `true`.
+* `source-mirror-address`: By default, the mirror creates a non-durable 
temporary queue to store messages before they are sent to the other broker. If 
you define a name value for this property, an ANYCAST durable queue and address 
is created with the specified name.
+
+An example of a mirror configuration is shown below:
+```xml
+<broker-connections source-mirror-id="1">
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-mirror">
+            <mirror  queue-removal="true" queue-creation="true" 
message-acknowledgements="true" source-mirror-address="myLocalSNFMirrorQueue"/>
+            <mirror target-mirror-id="2"/>
+    </amqp-connection>
+</broker-connections>
+```
+
+*Important*: A broker can mirror to multiple replicas (1 to many). However a 
replica broker can only have a single mirror source. Make sure you do not 
mirror multiple source brokers to a single replica broker.
+
+### Pre Existing Messages
+The broker will not send pre existing messages through the mirror. So, If you 
add mirror to your configuration and the journal had pre existing messages 
these messages will not be sent. 
+
+## Broker Connection Stop and Disconnect
+Once you start the broker connection with a mirror the mirror events will 
always be sent to the intermediate queue configured at the 
`source-mirror-address`.
+
+It is possible to stop the broker connection with the operation 
stopBrokerConnection(connectionName) on the ServerControl, but it is only 
effective to disconnect the brokers, while the mirror events are always 
captured.
+
+## Disaster & Recovery Considerations
+As you use the mirror option to replicate data across datacenters, you have to 
take a few considerations:
+
+* Currently we don't support quorums for activating the replica, so you have 
to manually control when your clients connect to the replica site.
+* Make sure the replica site is passive. Having producers and consumers 
connected into both sites would be messy and could lead you to data integrity 
issues.
+    * You can disable auto-start on the acceptor your clients use to connect, 
and only enable it after a disaster has occurred.
+* Only the queues and addresses are mirrored. Consumer states will have to be 
reapplied on the replica when the clients reconnects (that applies to message 
groups, exclusive consumers or anything related to clients)
+* Make sure your configuration options are copied over, including Diverts, 
security, last value queues, address settings and other configuration options.
+* Have a way back route after a disaster.
+    * You can have a disabled broker connection to be enabled after the 
disaster.
+
+
+## Mirror example with Failback
+On this example lets play with two brokers:
+- sourceBroker
+- replicaBroker
+
+Add this configuration on sourceBroker:
+
+```xml
+<broker-connections source-mirror-id="1">
+    <amqp-connection uri="tcp://replicaBroker:6700" name="DRSite">
+            <mirror message-acknowledgements="true"/>
+             <mirror target-mirror-id="2"/>

Review comment:
       Mismatched indents, both wrong. Plus same questions as earlier mirror 
config comments.

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an 
`<amqp-connection>` element within t
 
 *Notice:* If you disable auto-start on the broker connection, the start of the 
broker connection will only happen after the management method 
`startBrokerConnection(connectionName)` is called on the ServerController.
 
-*Important*: The target endpoint needs permission for all operations that you 
configure. Therefore, If you are using a security manager, ensure that you 
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs 
permission for all operations that you configure. Therefore, If you are using a 
security manager, ensure that you perform the configured operations as a user 
with sufficient permissions.
 
 # AMQP Server Connection Operations
 The following types of operations are supported on a AMQP server connection:
 
+* Mirrors
+    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
 * Senders
     * Messages received on specific queues are transferred to another endpoint
 * Receivers
     * The broker pulls messages from another endpoint
 * Peers
     * The broker creates both senders and receivers on another endpoint that 
knows how to handle them. Currently, this is implemented by Apache Qpid 
Dispatch.
-* Mirrors
-    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
 
+## Mirrors
+The mirror option on the broker connection can capture events from the broker 
and pass them over the wire to another broker. This enables you to capture 
multiple asynchronous replicas. The following types of events are captured:
+
+* Message routing
+* Message acknowledgement
+* Queue and address creation
+* queue and address deletion
+
+When you configure a mirror, these events are captured from the broker, stored 
on a local queue, and later forwarded to a target destination on another 
ActiveMQ Artemis broker.
+
+To configure a mirror, you add a `<mirror>` element within the 
`<amqp-connection>` element.
+
+The local queue is called `source-mirror-address`
+
+You can specify the following optional arguments.
+
+* `queue-removal`: Specifies whether a queue- or address-removal event is 
sent. The default value is `true`.
+* `message-acknowledgements`: Specifies whether message acknowledgements are 
sent. The default value is `true`.
+* `queue-creation`: Specifies whether a queue- or address-creation event is 
sent. The default value is `true`.
+* `source-mirror-address`: By default, the mirror creates a non-durable 
temporary queue to store messages before they are sent to the other broker. If 
you define a name value for this property, an ANYCAST durable queue and address 
is created with the specified name.
+
+An example of a mirror configuration is shown below:
+```xml
+<broker-connections source-mirror-id="1">
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-mirror">
+            <mirror  queue-removal="true" queue-creation="true" 
message-acknowledgements="true" source-mirror-address="myLocalSNFMirrorQueue"/>
+            <mirror target-mirror-id="2"/>
+    </amqp-connection>
+</broker-connections>
+```
+
+*Important*: A broker can mirror to multiple replicas (1 to many). However a 
replica broker can only have a single mirror source. Make sure you do not 
mirror multiple source brokers to a single replica broker.

Review comment:
       Is this still true? Isnt that part of what the extra mirror-ID and 
horrible ID-munging is about handling?

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an 
`<amqp-connection>` element within t
 
 *Notice:* If you disable auto-start on the broker connection, the start of the 
broker connection will only happen after the management method 
`startBrokerConnection(connectionName)` is called on the ServerController.
 
-*Important*: The target endpoint needs permission for all operations that you 
configure. Therefore, If you are using a security manager, ensure that you 
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs 
permission for all operations that you configure. Therefore, If you are using a 
security manager, ensure that you perform the configured operations as a user 
with sufficient permissions.
 
 # AMQP Server Connection Operations
 The following types of operations are supported on a AMQP server connection:
 
+* Mirrors
+    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
 * Senders
     * Messages received on specific queues are transferred to another endpoint
 * Receivers
     * The broker pulls messages from another endpoint
 * Peers
     * The broker creates both senders and receivers on another endpoint that 
knows how to handle them. Currently, this is implemented by Apache Qpid 
Dispatch.
-* Mirrors
-    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
 
+## Mirrors
+The mirror option on the broker connection can capture events from the broker 
and pass them over the wire to another broker. This enables you to capture 
multiple asynchronous replicas. The following types of events are captured:
+
+* Message routing
+* Message acknowledgement
+* Queue and address creation
+* queue and address deletion
+
+When you configure a mirror, these events are captured from the broker, stored 
on a local queue, and later forwarded to a target destination on another 
ActiveMQ Artemis broker.
+
+To configure a mirror, you add a `<mirror>` element within the 
`<amqp-connection>` element.
+
+The local queue is called `source-mirror-address`
+
+You can specify the following optional arguments.
+
+* `queue-removal`: Specifies whether a queue- or address-removal event is 
sent. The default value is `true`.
+* `message-acknowledgements`: Specifies whether message acknowledgements are 
sent. The default value is `true`.
+* `queue-creation`: Specifies whether a queue- or address-creation event is 
sent. The default value is `true`.
+* `source-mirror-address`: By default, the mirror creates a non-durable 
temporary queue to store messages before they are sent to the other broker. If 
you define a name value for this property, an ANYCAST durable queue and address 
is created with the specified name.
+
+An example of a mirror configuration is shown below:
+```xml
+<broker-connections source-mirror-id="1">
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-mirror">
+            <mirror  queue-removal="true" queue-creation="true" 
message-acknowledgements="true" source-mirror-address="myLocalSNFMirrorQueue"/>
+            <mirror target-mirror-id="2"/>

Review comment:
       As above, is source-mirror-address still configurable?
   
   Also, why are there 2 mirror elements on the same connection? What does that 
even mean?
   
   The 'target-mirror-id' option seemingly isnt covered in the doc, or the XSD 
changes. Is it actually a thing or can this be removed? 

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -69,11 +167,12 @@ Some examples are shown below.
 
 Using address expressions:
 ```xml
-<broker-connections>
+<broker-connections source-mirror-id="1">
     <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
         <sender address-match="queues.#"/>
         <!-- notice the local queues for remotequeues.# need to be created on 
this broker -->
         <receiver address-match="remotequeues.#"/>
+        <mirror target-mirror-id="2"/>

Review comment:
       Why is a bit about senders+recievers, i.e not mirrors, adding mirror 
config?

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an 
`<amqp-connection>` element within t
 
 *Notice:* If you disable auto-start on the broker connection, the start of the 
broker connection will only happen after the management method 
`startBrokerConnection(connectionName)` is called on the ServerController.
 
-*Important*: The target endpoint needs permission for all operations that you 
configure. Therefore, If you are using a security manager, ensure that you 
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs 
permission for all operations that you configure. Therefore, If you are using a 
security manager, ensure that you perform the configured operations as a user 
with sufficient permissions.
 
 # AMQP Server Connection Operations
 The following types of operations are supported on a AMQP server connection:
 
+* Mirrors
+    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
 * Senders
     * Messages received on specific queues are transferred to another endpoint
 * Receivers
     * The broker pulls messages from another endpoint
 * Peers
     * The broker creates both senders and receivers on another endpoint that 
knows how to handle them. Currently, this is implemented by Apache Qpid 
Dispatch.
-* Mirrors
-    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
 
+## Mirrors
+The mirror option on the broker connection can capture events from the broker 
and pass them over the wire to another broker. This enables you to capture 
multiple asynchronous replicas. The following types of events are captured:
+
+* Message routing
+* Message acknowledgement
+* Queue and address creation
+* queue and address deletion
+
+When you configure a mirror, these events are captured from the broker, stored 
on a local queue, and later forwarded to a target destination on another 
ActiveMQ Artemis broker.
+
+To configure a mirror, you add a `<mirror>` element within the 
`<amqp-connection>` element.
+
+The local queue is called `source-mirror-address`
+
+You can specify the following optional arguments.
+
+* `queue-removal`: Specifies whether a queue- or address-removal event is 
sent. The default value is `true`.
+* `message-acknowledgements`: Specifies whether message acknowledgements are 
sent. The default value is `true`.
+* `queue-creation`: Specifies whether a queue- or address-creation event is 
sent. The default value is `true`.
+* `source-mirror-address`: By default, the mirror creates a non-durable 
temporary queue to store messages before they are sent to the other broker. If 
you define a name value for this property, an ANYCAST durable queue and address 
is created with the specified name.
+
+An example of a mirror configuration is shown below:
+```xml
+<broker-connections source-mirror-id="1">
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-mirror">
+            <mirror  queue-removal="true" queue-creation="true" 
message-acknowledgements="true" source-mirror-address="myLocalSNFMirrorQueue"/>
+            <mirror target-mirror-id="2"/>
+    </amqp-connection>
+</broker-connections>
+```
+
+*Important*: A broker can mirror to multiple replicas (1 to many). However a 
replica broker can only have a single mirror source. Make sure you do not 
mirror multiple source brokers to a single replica broker.
+
+### Pre Existing Messages
+The broker will not send pre existing messages through the mirror. So, If you 
add mirror to your configuration and the journal had pre existing messages 
these messages will not be sent. 
+
+## Broker Connection Stop and Disconnect
+Once you start the broker connection with a mirror the mirror events will 
always be sent to the intermediate queue configured at the 
`source-mirror-address`.

Review comment:
       Another reference to source-mirror-address

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an 
`<amqp-connection>` element within t
 
 *Notice:* If you disable auto-start on the broker connection, the start of the 
broker connection will only happen after the management method 
`startBrokerConnection(connectionName)` is called on the ServerController.
 
-*Important*: The target endpoint needs permission for all operations that you 
configure. Therefore, If you are using a security manager, ensure that you 
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs 
permission for all operations that you configure. Therefore, If you are using a 
security manager, ensure that you perform the configured operations as a user 
with sufficient permissions.
 
 # AMQP Server Connection Operations
 The following types of operations are supported on a AMQP server connection:
 
+* Mirrors
+    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
 * Senders
     * Messages received on specific queues are transferred to another endpoint
 * Receivers
     * The broker pulls messages from another endpoint
 * Peers
     * The broker creates both senders and receivers on another endpoint that 
knows how to handle them. Currently, this is implemented by Apache Qpid 
Dispatch.
-* Mirrors
-    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
 
+## Mirrors
+The mirror option on the broker connection can capture events from the broker 
and pass them over the wire to another broker. This enables you to capture 
multiple asynchronous replicas. The following types of events are captured:
+
+* Message routing
+* Message acknowledgement
+* Queue and address creation
+* queue and address deletion
+
+When you configure a mirror, these events are captured from the broker, stored 
on a local queue, and later forwarded to a target destination on another 
ActiveMQ Artemis broker.
+
+To configure a mirror, you add a `<mirror>` element within the 
`<amqp-connection>` element.
+
+The local queue is called `source-mirror-address`
+
+You can specify the following optional arguments.
+
+* `queue-removal`: Specifies whether a queue- or address-removal event is 
sent. The default value is `true`.
+* `message-acknowledgements`: Specifies whether message acknowledgements are 
sent. The default value is `true`.
+* `queue-creation`: Specifies whether a queue- or address-creation event is 
sent. The default value is `true`.
+* `source-mirror-address`: By default, the mirror creates a non-durable 
temporary queue to store messages before they are sent to the other broker. If 
you define a name value for this property, an ANYCAST durable queue and address 
is created with the specified name.
+
+An example of a mirror configuration is shown below:
+```xml
+<broker-connections source-mirror-id="1">
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-mirror">
+            <mirror  queue-removal="true" queue-creation="true" 
message-acknowledgements="true" source-mirror-address="myLocalSNFMirrorQueue"/>
+            <mirror target-mirror-id="2"/>
+    </amqp-connection>
+</broker-connections>
+```
+
+*Important*: A broker can mirror to multiple replicas (1 to many). However a 
replica broker can only have a single mirror source. Make sure you do not 
mirror multiple source brokers to a single replica broker.
+
+### Pre Existing Messages
+The broker will not send pre existing messages through the mirror. So, If you 
add mirror to your configuration and the journal had pre existing messages 
these messages will not be sent. 
+
+## Broker Connection Stop and Disconnect
+Once you start the broker connection with a mirror the mirror events will 
always be sent to the intermediate queue configured at the 
`source-mirror-address`.
+
+It is possible to stop the broker connection with the operation 
stopBrokerConnection(connectionName) on the ServerControl, but it is only 
effective to disconnect the brokers, while the mirror events are always 
captured.
+
+## Disaster & Recovery Considerations
+As you use the mirror option to replicate data across datacenters, you have to 
take a few considerations:
+
+* Currently we don't support quorums for activating the replica, so you have 
to manually control when your clients connect to the replica site.
+* Make sure the replica site is passive. Having producers and consumers 
connected into both sites would be messy and could lead you to data integrity 
issues.
+    * You can disable auto-start on the acceptor your clients use to connect, 
and only enable it after a disaster has occurred.
+* Only the queues and addresses are mirrored. Consumer states will have to be 
reapplied on the replica when the clients reconnects (that applies to message 
groups, exclusive consumers or anything related to clients)
+* Make sure your configuration options are copied over, including Diverts, 
security, last value queues, address settings and other configuration options.
+* Have a way back route after a disaster.
+    * You can have a disabled broker connection to be enabled after the 
disaster.
+
+
+## Mirror example with Failback
+On this example lets play with two brokers:
+- sourceBroker
+- replicaBroker
+
+Add this configuration on sourceBroker:
+
+```xml
+<broker-connections source-mirror-id="1">
+    <amqp-connection uri="tcp://replicaBroker:6700" name="DRSite">
+            <mirror message-acknowledgements="true"/>
+             <mirror target-mirror-id="2"/>
+    </amqp-connection>
+</broker-connections>
+```
+
+On the replicaBroker, add a disabled broker connection for failing back after 
a disaster, and also set the acceptors with autoStart=false
+
+```xml
+
+<acceptors>
+     <!-- this one is for clients -->
+     <acceptor 
name="artemis">tcp://0.0.0.0:61616?autoStart=false;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;autoStart=false</acceptor>
+     <!-- this one is for DR communication -->
+     <acceptor 
name="amqp">tcp://0.0.0.0:6700?autoStart=true;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;autoStart=false</acceptor>

Review comment:
       The autoStart option is defined twice in both acceptors. In the second 
they actually conflict, one true and the other false.
   
   Seems like it would generally be good to simplify this to the config of 
actual interest. E.g drop protocols not used or add a placeholder. Drop 
unrelated config that should be defaulting to the same values and few people 
should ever touch, such as...well most of these options really.

##########
File path: 
tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/containersupport/TestContainerBase.java
##########
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.containersupport;
+
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;

Review comment:
       Unused imports

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an 
`<amqp-connection>` element within t
 
 *Notice:* If you disable auto-start on the broker connection, the start of the 
broker connection will only happen after the management method 
`startBrokerConnection(connectionName)` is called on the ServerController.
 
-*Important*: The target endpoint needs permission for all operations that you 
configure. Therefore, If you are using a security manager, ensure that you 
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs 
permission for all operations that you configure. Therefore, If you are using a 
security manager, ensure that you perform the configured operations as a user 
with sufficient permissions.
 
 # AMQP Server Connection Operations
 The following types of operations are supported on a AMQP server connection:
 
+* Mirrors
+    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
 * Senders
     * Messages received on specific queues are transferred to another endpoint
 * Receivers
     * The broker pulls messages from another endpoint
 * Peers
     * The broker creates both senders and receivers on another endpoint that 
knows how to handle them. Currently, this is implemented by Apache Qpid 
Dispatch.
-* Mirrors
-    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
 
+## Mirrors
+The mirror option on the broker connection can capture events from the broker 
and pass them over the wire to another broker. This enables you to capture 
multiple asynchronous replicas. The following types of events are captured:
+
+* Message routing
+* Message acknowledgement
+* Queue and address creation
+* queue and address deletion
+
+When you configure a mirror, these events are captured from the broker, stored 
on a local queue, and later forwarded to a target destination on another 
ActiveMQ Artemis broker.
+
+To configure a mirror, you add a `<mirror>` element within the 
`<amqp-connection>` element.
+
+The local queue is called `source-mirror-address`
+
+You can specify the following optional arguments.
+
+* `queue-removal`: Specifies whether a queue- or address-removal event is 
sent. The default value is `true`.
+* `message-acknowledgements`: Specifies whether message acknowledgements are 
sent. The default value is `true`.
+* `queue-creation`: Specifies whether a queue- or address-creation event is 
sent. The default value is `true`.
+* `source-mirror-address`: By default, the mirror creates a non-durable 
temporary queue to store messages before they are sent to the other broker. If 
you define a name value for this property, an ANYCAST durable queue and address 
is created with the specified name.
+
+An example of a mirror configuration is shown below:
+```xml
+<broker-connections source-mirror-id="1">
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-mirror">
+            <mirror  queue-removal="true" queue-creation="true" 
message-acknowledgements="true" source-mirror-address="myLocalSNFMirrorQueue"/>
+            <mirror target-mirror-id="2"/>
+    </amqp-connection>
+</broker-connections>
+```
+
+*Important*: A broker can mirror to multiple replicas (1 to many). However a 
replica broker can only have a single mirror source. Make sure you do not 
mirror multiple source brokers to a single replica broker.
+
+### Pre Existing Messages
+The broker will not send pre existing messages through the mirror. So, If you 
add mirror to your configuration and the journal had pre existing messages 
these messages will not be sent. 
+
+## Broker Connection Stop and Disconnect
+Once you start the broker connection with a mirror the mirror events will 
always be sent to the intermediate queue configured at the 
`source-mirror-address`.
+
+It is possible to stop the broker connection with the operation 
stopBrokerConnection(connectionName) on the ServerControl, but it is only 
effective to disconnect the brokers, while the mirror events are always 
captured.
+
+## Disaster & Recovery Considerations
+As you use the mirror option to replicate data across datacenters, you have to 
take a few considerations:
+
+* Currently we don't support quorums for activating the replica, so you have 
to manually control when your clients connect to the replica site.
+* Make sure the replica site is passive. Having producers and consumers 
connected into both sites would be messy and could lead you to data integrity 
issues.
+    * You can disable auto-start on the acceptor your clients use to connect, 
and only enable it after a disaster has occurred.
+* Only the queues and addresses are mirrored. Consumer states will have to be 
reapplied on the replica when the clients reconnects (that applies to message 
groups, exclusive consumers or anything related to clients)
+* Make sure your configuration options are copied over, including Diverts, 
security, last value queues, address settings and other configuration options.
+* Have a way back route after a disaster.
+    * You can have a disabled broker connection to be enabled after the 
disaster.
+
+
+## Mirror example with Failback
+On this example lets play with two brokers:
+- sourceBroker
+- replicaBroker
+
+Add this configuration on sourceBroker:
+
+```xml
+<broker-connections source-mirror-id="1">
+    <amqp-connection uri="tcp://replicaBroker:6700" name="DRSite">
+            <mirror message-acknowledgements="true"/>
+             <mirror target-mirror-id="2"/>
+    </amqp-connection>
+</broker-connections>
+```
+
+On the replicaBroker, add a disabled broker connection for failing back after 
a disaster, and also set the acceptors with autoStart=false
+
+```xml
+
+<acceptors>
+     <!-- this one is for clients -->
+     <acceptor 
name="artemis">tcp://0.0.0.0:61616?autoStart=false;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;autoStart=false</acceptor>
+     <!-- this one is for DR communication -->
+     <acceptor 
name="amqp">tcp://0.0.0.0:6700?autoStart=true;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;autoStart=false</acceptor>
+</acceptors>
+<broker-connections source-mirror-id="2">
+    <amqp-connection uri="tcp://sourceBroker:6700" name="failbackBroker" 
auto-start="false">
+            <mirror message-acknowledgements="true"/>
+            <mirror target-mirror-id="1"/>

Review comment:
       Indent is wrong. Plus same questions as earlier mirror config comments.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
##########
@@ -202,8 +241,35 @@ private static Properties getProperties(Message message) {
 
    @Override
    public void postAcknowledge(MessageReference ref, AckReason reason) throws 
Exception {
+
+      MirrorController targetController = getControllerTarget();
+
+      if (targetController != null || ref.getQueue() != null && 
(ref.getQueue().isInternalQueue() || ref.getQueue().isMirrorController())) {
+         if (logger.isTraceEnabled()) {
+            logger.trace(server + " rejecting postAcknowledge queue=" + 
ref.getQueue().getName() + ", ref=" + ref + " to avoid infinite loop with the 
mirror (reflection)");
+         }
+         return;
+      }
+
+      if (logger.isTraceEnabled()) {
+         logger.trace(server + " postAcknowledge " + ref);
+      }
+
       if (acks && !ref.getQueue().isMirrorController()) { // we don't call 
postACK on snfqueues, otherwise we would get infinite loop because of this 
feedback
-         Message message = createMessage(ref.getQueue().getAddress(), 
ref.getQueue().getName(), POST_ACK, ref.getMessage().getMessageID());
+         Long internalIDObject = 
(Long)ref.getMessage().getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
+         long internalID;
+         if (internalIDObject == null) {
+            internalID = ByteUtil.mixByteAndLong(localMirrorId, 
ref.getMessageID());
+         } else {
+            internalID = internalIDObject.longValue();
+            if (logger.isTraceEnabled()) {
+               logger.trace("server " + server + " acking message " + ref);
+            }
+         }
+         if (logger.isTraceEnabled()) {
+            logger.trace(server + " sending ack message from server " + 
ByteUtil.getFirstByte(internalID) + " with messageID=" + 
ByteUtil.removeFirstByte(internalID));
+         }

Review comment:
       I continue to think this is a bad idea.
   
   In this bit of code we munge together two values to form one (perhaps 
dropping information from the original ID silently while doing so) unless that 
was already done before...then either way we take that value and un-munge it in 
order to log the two components. Then we send it over the wire, where the other 
side will unmunge it and hopefully come up with 2 values it understands, and 
end up doing a whole bunch of similar hoop jumping with them/it.
   
   Meanwhile, on sending a regular message for delivery we use 2 entirely 
separate delivery annotations to carry the pristine values without any need to 
jump through hoops or possibly unwittingly throw away information from the ID. 
Always transmitting the values separately and simplifying the whole situation 
just seems like a far better way to go.

##########
File path: 
tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/containersupport/ContainerService.java
##########
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.containersupport;
+
+import javax.jms.ConnectionFactory;
+
+import java.io.File;
+import java.util.function.Consumer;
+
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.Assert;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.OutputFrame;
+import 
org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+
+public abstract class ContainerService {
+
+   public static final ContainerService service;
+
+   static {
+      ContainerService loadingService;
+      try {
+         String providerName = 
System.getProperty(ContainerService.class.getName() + ".service");
+         if (providerName == null) {
+            loadingService = new TestContainerImpl();
+         } else {
+            loadingService = (ContainerService) 
Class.forName(providerName).newInstance();
+         }
+      } catch (Throwable e) {
+         e.printStackTrace();
+         loadingService = null;
+      }
+
+      service = loadingService;
+   }
+
+   public static ContainerService getService() {
+      return service;
+   }
+
+   public abstract Object newNetwork();
+
+   public abstract Object newBrokerImage();
+
+   public abstract Object newInterconnectImage();
+
+   public abstract void setNetwork(Object container, Object network);
+
+   public abstract void exposePorts(Object container, Integer... ports);
+
+   public abstract void exposeFile(Object container, String hostPath, String 
containerPath);
+
+   public abstract void exposeFolder(Object container, String hostPath, String 
containerPath);
+
+   public abstract void copyFileToContainer(Object container, String hostPath, 
String containerPath);
+
+   public abstract void exposeBrokerHome(Object container, String brokerHome);
+
+   public abstract void startLogging(Object container, String prefix);
+
+   public abstract void start(Object container);
+
+   public abstract void kill(Object container);
+
+   public abstract void stop(Object container);
+
+   public abstract void restart(Object container);
+
+   public abstract int getPort(Object container, int mappedPort);
+
+   public abstract void exposeHosts(Object container, String... hosts);
+
+   public String getHost(Object container) {
+      return "localhost";
+   }
+
+   public abstract ConnectionFactory createCF(Object container, String 
protocol);
+
+   public abstract String createURI(Object container, int port);
+
+   public abstract ConnectionFactory createCF(Object container, String 
protocol, int port);
+   public abstract ConnectionFactory createCF(Object container, String 
protocol, int port, String extraURL);
+
+   public boolean waitForServerToStart(Object container, String username, 
String password, long timeout) throws InterruptedException {
+      long realTimeout = System.currentTimeMillis() + timeout;
+      while (System.currentTimeMillis() < realTimeout) {
+         try {
+            ConnectionFactory cf = createCF(container, "core");
+            cf.createConnection(username, password).close();
+            if (cf instanceof AutoCloseable) {
+               ((AutoCloseable)cf).close();
+            }
+            System.out.println("server started");
+         } catch (Exception e) {
+            System.out.println("awaiting server start at ");
+            Thread.sleep(500);
+            continue;
+         }
+         return true;
+      }
+
+      return false;
+   }
+
+   public abstract void logWait(Object container, String log);
+
+   private static class TestContainerImpl extends ContainerService {
+
+      @Override
+      public ConnectionFactory createCF(Object container, String protocol) {
+         return createCF(container, protocol, 61616);
+      }
+
+
+      @Override
+      public ConnectionFactory createCF(Object container, String protocol, int 
port) {
+         return CFUtil.createConnectionFactory("amqp", "tcp://" + 
getHost(container) + ":" + getPort(container, port));
+      }
+      @Override
+      public ConnectionFactory createCF(Object container, String protocol, int 
port, String extraURI) {
+         System.out.println("tcp://" + getHost(container) + ":" + 
getPort(container, port) + extraURI);
+         return CFUtil.createConnectionFactory("amqp", "tcp://" + 
getHost(container) + ":" + getPort(container, port) + extraURI);
+      }
+
+      @Override
+      public Object newNetwork() {
+         return Network.newNetwork();
+      }
+
+      @Override
+      public Object newBrokerImage() {
+         return new 
GenericContainer<>(DockerImageName.parse("artemis-centos"));
+      }
+
+      @Override
+      public Object newInterconnectImage() {
+         return new 
GenericContainer<>(DockerImageName.parse("quay.io/interconnectedcloud/qdrouterd:latest"));
+      }
+
+      @Override
+      public void setNetwork(Object container, Object network) {
+         ((GenericContainer)container).setNetwork((Network)network);
+      }
+
+      @Override
+      public void exposePorts(Object container, Integer... ports) {
+         ((GenericContainer)container).withExposedPorts(ports);
+      }
+
+      @Override
+      public void exposeFile(Object container, String hostPath, String 
containerPath) {
+         File file = new File(hostPath);
+         Assert.assertTrue(file.exists());
+         Assert.assertFalse(file.isDirectory());
+         ((GenericContainer)container).withFileSystemBind(hostPath, 
containerPath);
+      }
+
+      @Override
+      public void exposeFolder(Object container, String hostPath, String 
containerPath) {
+         File file = new File(hostPath);
+         Assert.assertTrue(file.exists());
+         Assert.assertTrue(file.isDirectory());
+         ((GenericContainer)container).withFileSystemBind(hostPath, 
containerPath);
+      }
+
+      @Override
+      public void copyFileToContainer(Object container, String hostPath, 
String containerPath) {
+         File file = new File(hostPath);
+         Assert.assertTrue(file.exists());
+         Assert.assertFalse(file.isDirectory());
+         
((GenericContainer)container).withCopyFileToContainer(MountableFile.forHostPath(hostPath),
 containerPath);
+      }
+
+      @Override
+      public void exposeBrokerHome(Object container, String brokerHome) {
+         exposeFolder(container, brokerHome, "/var/lib/artemis-instance");
+      }
+
+      @Override
+      public void start(Object container) {
+         ((GenericContainer)container).setStartupCheckStrategy(new 
IsRunningStartupCheckStrategy());
+         ((GenericContainer)container).start();
+      }
+
+      @Override
+      public void restart(Object containerObj) {
+         kill(containerObj);
+         start(containerObj);
+      }
+
+      @Override
+      public void kill(Object containerObj) {
+         GenericContainer container = (GenericContainer) containerObj;
+         
container.getDockerClient().killContainerCmd(container.getContainerId()).exec();
+         container.stop();
+      }
+
+      @Override
+      public int getPort(Object container, int mappedPort) {
+         return ((GenericContainer)container).getMappedPort(mappedPort);
+      }
+
+      @Override
+      public void exposeHosts(Object container, String... hosts) {
+         ((GenericContainer)container).withNetworkAliases(hosts);
+      }
+
+      @Override
+      public void stop(Object container) {
+         if (container != null) {
+            ((GenericContainer) container).stop();
+         }
+      }
+
+      @Override
+      public void startLogging(Object container, String prefix) {
+         ((GenericContainer)container).withLogConsumer(new 
Consumer<OutputFrame>() {
+            @Override
+            public void accept(OutputFrame outputFrame) {
+               System.out.print(prefix + outputFrame.getUtf8String());
+            }
+         });
+      }
+
+      @Override
+      public void logWait(Object container, String log) {
+         LogMessageWaitStrategy logMessageWaitStrategy = new 
LogMessageWaitStrategy();
+         logMessageWaitStrategy.withRegEx(log);
+         ((GenericContainer)container).setWaitStrategy(logMessageWaitStrategy);
+      }
+
+      public String createURI(Object container, int port) {

Review comment:
       Missing override




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 616597)
    Time Spent: 2h 20m  (was: 2h 10m)

> Enhance AMQP Mirror support with dual mirror
> --------------------------------------------
>
>                 Key: ARTEMIS-3243
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3243
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.17.0
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.18.0
>
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> at the current Mirror version, we can only mirror into a single direction.
> With this enhancement the two (or more brokers) would be connected to each 
> other, each one having its own ID, and each one would send updates to the 
> other broker.
> The outcome is that if you just transferred producers and consumers from one 
> broker into the other, the fallback would be automatic and simple. No need to 
> disable and enable mirror options.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to