[
https://issues.apache.org/jira/browse/ARTEMIS-3243?focusedWorklogId=616597&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-616597
]
ASF GitHub Bot logged work on ARTEMIS-3243:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 29/Jun/21 16:35
Start Date: 29/Jun/21 16:35
Worklog Time Spent: 10m
Work Description: gemmellr commented on a change in pull request #3633:
URL: https://github.com/apache/activemq-artemis/pull/3633#discussion_r659722086
##########
File path:
artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
##########
@@ -127,6 +127,10 @@
@Option(name = "--relax-jolokia", description = "disable strict checking on
jolokia-access.xml")
private boolean relaxJolokia;
+
+ @Option(name = "--cloud", description = "This is an instance that will be
shared by a cloud folder")
Review comment:
This feels like it should be in its own PR, seems like a distinct
feature, just one possibly used [in a few tests?] here.
Not sure 'cloud' is a great name for this, not seeing anything particularly
cloudy with it? Appears like it maybe just forces particular (undocumented)
instance dirs, perhaps as a type of sharing? Is this really needed?
##########
File path:
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/Pool.java
##########
@@ -61,6 +69,10 @@ public final T borrow() {
public final void release(T object) {
if (internalPool != null) {
internalPool.offer(object);
+ } else {
+ if (logger.isTraceEnabled()) {
+ logger.trace("internalPool was empty");
Review comment:
was null?
##########
File path: artemis-server/src/main/resources/schema/artemis-configuration.xsd
##########
@@ -2185,11 +2193,11 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
- <xsd:attribute name="source-mirror-address" type="xsd:string"
use="optional" default="">
+ <xsd:attribute name="durable" type="xsd:boolean" use="optional"
default="true">
Review comment:
Does this mean its no longer configurable? If so, what do existing folks
that previously configured it do?
##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an
`<amqp-connection>` element within t
*Notice:* If you disable auto-start on the broker connection, the start of the
broker connection will only happen after the management method
`startBrokerConnection(connectionName)` is called on the ServerController.
-*Important*: The target endpoint needs permission for all operations that you
configure. Therefore, If you are using a security manager, ensure that you
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs
permission for all operations that you configure. Therefore, If you are using a
security manager, ensure that you perform the configured operations as a user
with sufficient permissions.
# AMQP Server Connection Operations
The following types of operations are supported on a AMQP server connection:
+* Mirrors
+ * The broker uses an AMQP connection to another broker and duplicate
messages and sends acknowledgements over the wire.
* Senders
* Messages received on specific queues are transferred to another endpoint
* Receivers
* The broker pulls messages from another endpoint
* Peers
* The broker creates both senders and receivers on another endpoint that
knows how to handle them. Currently, this is implemented by Apache Qpid
Dispatch.
-* Mirrors
- * The broker uses an AMQP connection to another broker and duplicate
messages and sends acknowledgements over the wire.
+## Mirrors
+The mirror option on the broker connection can capture events from the broker
and pass them over the wire to another broker. This enables you to capture
multiple asynchronous replicas. The following types of events are captured:
+
+* Message routing
+* Message acknowledgement
+* Queue and address creation
+* queue and address deletion
+
+When you configure a mirror, these events are captured from the broker, stored
on a local queue, and later forwarded to a target destination on another
ActiveMQ Artemis broker.
+
+To configure a mirror, you add a `<mirror>` element within the
`<amqp-connection>` element.
+
+The local queue is called `source-mirror-address`
+
+You can specify the following optional arguments.
+
+* `queue-removal`: Specifies whether a queue- or address-removal event is
sent. The default value is `true`.
+* `message-acknowledgements`: Specifies whether message acknowledgements are
sent. The default value is `true`.
+* `queue-creation`: Specifies whether a queue- or address-creation event is
sent. The default value is `true`.
+* `source-mirror-address`: By default, the mirror creates a non-durable
temporary queue to store messages before they are sent to the other broker. If
you define a name value for this property, an ANYCAST durable queue and address
is created with the specified name.
Review comment:
Is the queue still named after source-mirror-address and configurable?
The XSD bits for that option seemed to be removed and a 'durable' option added?
The latter doesnt seem to be covered here.
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
##########
@@ -162,6 +162,7 @@ protected ActiveMQServer createServer(int port, boolean
start) throws Exception
final ActiveMQServer server = this.createServer(true, true);
+ server.getConfiguration().setBrokerMirrorId((short)1);
Review comment:
Feels like tests that need this should either override the creation
process, or an explicit method, to toggle this rather than doing it for all
tests even if they arent testing this. E.g perhaps via
addAcceptorConfiguration, which actually shows up lower in the context of this
diff.
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
##########
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.PrintStream;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.cli.commands.tools.PrintData;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.StringPrintStream;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.jboss.logging.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BrokerInSyncTest extends AmqpClientTestSupport {
+
+ public static final int TIME_BEFORE_RESTART = 1000;
+ protected static final int AMQP_PORT_2 = 5673;
+ protected static final int AMQP_PORT_3 = 5674;
+ private static final Logger logger =
Logger.getLogger(BrokerInSyncTest.class);
+ ActiveMQServer server_2;
+
+ @Before
+ public void startLogging() {
+ AssertionLoggerHandler.startCapture();
+ }
+
+ @After
+ public void stopLogging() {
+ try {
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
+ } finally {
+ AssertionLoggerHandler.stopCapture();
+ }
+ }
+
+ @Override
+ protected ActiveMQServer createServer() throws Exception {
+ return createServer(AMQP_PORT, false);
+ }
+
+ @Test
+ public void testSyncOnCreateQueues() throws Exception {
+ server.setIdentity("Server1");
+ {
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
+ amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ }
+ server.start();
+
+ server_2 = createServer(AMQP_PORT_2, false);
+ server_2.setIdentity("Server2");
+
+ {
+ AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+ amqpConnection.addElement(new
AMQPMirrorBrokerConnectionElement().setDurable(true));
+ server_2.getConfiguration().addAMQPConnection(amqpConnection);
+ }
+
+ server_2.start();
+
+ server_2.addAddressInfo(new
AddressInfo("sometest").setAutoCreated(false));
+ server_2.createQueue(new
QueueConfiguration("sometest").setDurable(true));
+
+ Wait.assertTrue(() -> server_2.locateQueue("sometest") != null);
+ Wait.assertTrue(() -> server.locateQueue("sometest") != null);
+
+ server.addAddressInfo(new
AddressInfo("OnServer1").setAutoCreated(false));
+ server.createQueue(new QueueConfiguration("OnServer1").setDurable(true));
+
+ Wait.assertTrue(() -> server.locateQueue("OnServer1") != null);
+ Wait.assertTrue("Sync is not working on the way back", () ->
server_2.locateQueue("OnServer1") != null, 2000);
+
+ Wait.assertTrue(() -> server_2.locateQueue("sometest") != null);
+ Wait.assertTrue(() -> server.locateQueue("sometest") != null);
+
+ for (int i = 0; i < 10; i++) {
+ final int queueID = i;
+ server_2.createQueue(new QueueConfiguration("test2_" +
i).setDurable(true));
+ server.createQueue(new QueueConfiguration("test1_" +
i).setDurable(true));
+ Wait.assertTrue(() -> server.locateQueue("test2_" + queueID) != null);
+ Wait.assertTrue(() -> server.locateQueue("test1_" + queueID) != null);
+ Wait.assertTrue(() -> server_2.locateQueue("test2_" + queueID) !=
null);
+ Wait.assertTrue(() -> server_2.locateQueue("test1_" + queueID) !=
null);
+ }
+
+ server_2.stop();
+ server.stop();
+ }
+
+ @Test
+ public void testSyncData() throws Exception {
+ int NUMBER_OF_MESSAGES = 100;
+ server.setIdentity("Server1");
+ server.getConfiguration().setBrokerMirrorId((short) 1);
Review comment:
Quite a lot of casts remaining after adding the int method to avoid
casting (which this is presumably using, just after expansion).
##########
File path:
artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
##########
@@ -39,6 +39,49 @@
private static Logger log = Logger.getLogger(ByteUtilTest.class);
+
+ @Test
+ public void testMixIDs1() {
+ long mixedID = ByteUtil.mixByteAndLong((byte)3, -1L); // just a note,
-1 is 0Xffffff....
+ Assert.assertEquals((byte)3, ByteUtil.getFirstByte(mixedID));
+
+ long mixedIDWithMaxValue = ByteUtil.mixByteAndLong((byte)3,
Long.MAX_VALUE);
+ // first byte is 03, all the rest is fff...
+ Assert.assertEquals(0x03ffffffffffffffL, mixedIDWithMaxValue);
+ // -1 and fff should return you the same value when mixing 03, I'm doing
this check just in case.
+ Assert.assertEquals(mixedIDWithMaxValue, mixedID);
Review comment:
Two totally different 'mixes' resulting in the same value is a good
demonstration why I think this 'ID munging' is a bad idea.
##########
File path:
artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
##########
@@ -39,6 +39,49 @@
private static Logger log = Logger.getLogger(ByteUtilTest.class);
+
+ @Test
+ public void testMixIDs1() {
+ long mixedID = ByteUtil.mixByteAndLong((byte)3, -1L); // just a note,
-1 is 0Xffffff....
+ Assert.assertEquals((byte)3, ByteUtil.getFirstByte(mixedID));
+
+ long mixedIDWithMaxValue = ByteUtil.mixByteAndLong((byte)3,
Long.MAX_VALUE);
+ // first byte is 03, all the rest is fff...
+ Assert.assertEquals(0x03ffffffffffffffL, mixedIDWithMaxValue);
+ // -1 and fff should return you the same value when mixing 03, I'm doing
this check just in case.
+ Assert.assertEquals(mixedIDWithMaxValue, mixedID);
+ }
+
+ @Test
+ public void testUnsalt() {
Review comment:
All in all this seems a long way of testing
ByteUtil.removeFirstByte(long) zeros the first byte. Feeding it a couple of
corner cases like -1 etc and the known results would seem simpler.
EDIT: If there is a 'testUnsalt' I might expect a 'testSalt'. This seems to
be doing both so could at least be renamed. I'd suggest separating into two
simpler tests. Actually, thats kind of what 'testMixIDs1' is doing.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
##########
@@ -490,12 +506,18 @@ private void connectSender(Queue queue,
Source source = new Source();
source.setAddress(queue.getAddress().toString());
sender.setSource(source);
+ HashMap<Symbol, Object> mapProperties = new HashMap<>(1, 1); //
this map is expected to have a single element, so load factor = 1
+ mapProperties.put(AMQPMirrorControllerSource.BROKER_ID,
server.getMirrorBrokerId());
+ sender.setProperties(mapProperties);
Review comment:
This method is also used for creating non-mirror senders. I don't think
it should be adding the mirror ID on non-mirror links in those cases.
This also goes to another comment elsewhere - defining a mirror-id appears
to have been made mandatory, even if you aren't using mirroring which seems
quite annoying. Maybe this is why?
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
##########
@@ -324,13 +337,13 @@ private static void
uninstallMirrorController(AMQPMirrorBrokerConnectionElement
* It is returning the snfQueue to the replica, and I needed isolation
from the actual instance.
* During development I had a mistake where I used a property from the
Object,
* so, I needed this isolation for my organization and making sure nothing
would be shared. */
- private static Queue installMirrorController(AMQPBrokerConnection
brokerConnection, AMQPMirrorBrokerConnectionElement replicaConfig,
ActiveMQServer server) throws Exception {
+ private Queue installMirrorController(AMQPMirrorBrokerConnectionElement
replicaConfig, ActiveMQServer server) throws Exception {
Review comment:
The comment for the method seems to be describing why this method is
static, which it isnt after this change, so the comment needs removed or
updated.
##########
File path:
artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
##########
@@ -874,6 +868,36 @@ public Object run(ActionContext context) throws Exception {
return null;
}
+ private void generateETCParameters(HashMap<String, String> filters,
+ File etcFolder,
+ File oomeDumpFile,
+ File dataFolder) throws IOException {
+
+
+
+ if (cloud) {
+ filters.put("${artemis.home}", "/opt/activemq-artemis");
+ filters.put("${artemis.instance}", "/var/lib/artemis-instance");
+ filters.put("${artemis.instance.data}",
"/var/lib/artemis-instance/data");
+ filters.put("${artemis.instance.etc}",
"/var/lib/artemis-instance/etc");
+ filters.put("${artemis.instance.etc.uri}",
"file:/var/lib/artemis-instance/./etc/");
+ filters.put("${artemis.instance.uri}",
"file:/var/lib/artemis-instance/./");
+ filters.put("${artemis.instance.oome.dump}",
"/var/lib/artemis-instance/log/oom_dump.hprof");
Review comment:
Seems a little presumptive that it would be assumed to be in /opt, /var
etc. Also these dont seem to be documented anywhere. Unclear why are there many
fewer things set here? Windows users not allowed to use '--cloud' (restriction
also undocumented)?
##########
File path:
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/MpscPool.java
##########
@@ -33,12 +33,17 @@
*/
public class MpscPool<T> extends Pool<T> {
+ private static final Logger logger = Logger.getLogger(MpscPool.class);
+
public MpscPool(int maxSize, Consumer<T> cleaner, Supplier<T> supplier) {
super(maxSize, cleaner, supplier);
}
@Override
protected Queue<T> createQueue(int maxSize) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Creating Pool with size=" + maxSize + " for classes for
" + supplier.get().getClass());
Review comment:
Burning a value just to log its type, and exposing the supplier field to
enable doing it, seems a bit ugly...is the type really needed here? The parent
class looks to log the type when it is returning instances anyway.
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
##########
@@ -598,10 +598,29 @@ public void parseMainConfig(final Element e, final
Configuration config) throws
parseClusterConnectionConfigurationURI(ccNode, config);
}
+/* Integer mirrorBrokerId = getInteger(e, "broker-mirror-id", 0, (name,
value) -> {
+ if (value != null) {
+ int valueInteger = (int) value;
+ if (valueInteger < 0 || valueInteger > 255) {
+ throw ActiveMQMessageBundle.BUNDLE.invalidBrokerID(value);
+ }
+ }
+ });
+
+ config.setBrokerMirrorId(mirrorBrokerId.shortValue());
+*/
NodeList ccAMQPConnections =
e.getElementsByTagName("broker-connections");
+ System.out.println("Connections length = " +
ccAMQPConnections.getLength());
Review comment:
Leftover debug System.out.println
##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an
`<amqp-connection>` element within t
*Notice:* If you disable auto-start on the broker connection, the start of the
broker connection will only happen after the management method
`startBrokerConnection(connectionName)` is called on the ServerController.
-*Important*: The target endpoint needs permission for all operations that you
configure. Therefore, If you are using a security manager, ensure that you
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs
permission for all operations that you configure. Therefore, If you are using a
security manager, ensure that you perform the configured operations as a user
with sufficient permissions.
Review comment:
Its not clear what 'matching mirror ID' means here, which seems odd for
something labelled important, as it hasnt been outlined at all other than to
say each broker requires a unique mirror ID...matching what exactly?
(As earlier, I think this ID should be optional or elsewhere)
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
##########
@@ -598,10 +598,29 @@ public void parseMainConfig(final Element e, final
Configuration config) throws
parseClusterConnectionConfigurationURI(ccNode, config);
}
+/* Integer mirrorBrokerId = getInteger(e, "broker-mirror-id", 0, (name,
value) -> {
Review comment:
Commented out code leftover
##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -7,21 +7,24 @@ Currently, this feature supports only the AMQP protocol.
However, in the future,
You configure broker connections using a `<broker-connections>` XML element in
the `broker.xml` configuration file.
```xml
-<broker-connections>
+<broker-connections mirror-id="1">
...
</broker-connections>
```
+Each broker requires a unique id, defined in the `<broker-connections>` XML
element as `mirror-id`.
+
# AMQP Server Connections
An ActiveMQ Artemis broker can initiate connections using the AMQP protocol.
This means that the broker can connect to another AMQP server (not necessarily
ActiveMQ Artemis) and create elements on that connection.
To define an AMQP broker connection, add an `<amqp-connection>` element within
the `<broker-connections` element in the `broker.xml` configuration file. For
example:
```xml
-<broker-connections>
+<broker-connections source-mirror-id="1">
<amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"
retry-interval="100" reconnect-attempts="-1" user="john" password="doe">
...
+ <mirror/>
Review comment:
Wrong indentation.
Also more importantly, the "..." was used to signify a placeholder for 'some
content', it was left otherwise empty last time round as this bit was only
covering the connection config only rather than the children types, none of
which have been covered yet, but each having their own sections immediately
following.
I would remove this and leave the placeholder nature of the "...", if needed
make it more obvious with say "...senders/receivers/mirrors...".
##########
File path: artemis-server/src/main/resources/schema/artemis-configuration.xsd
##########
@@ -2073,9 +2074,16 @@
</xsd:complexType>
<xsd:complexType name="brokerConnectType">
- <xsd:sequence maxOccurs="unbounded">
+ <xsd:sequence maxOccurs="unbounded" minOccurs="0">
<xsd:element name="amqp-connection" type="amqp-connectionUriType"/>
</xsd:sequence>
+ <xsd:attribute name="mirror-id" type="xsd:unsignedByte" use="required">
+ <xsd:annotation>
+ <xsd:documentation>
+ When you connect mirrors towards this broker, you need a
uniqute byte (less than 255, more than 0) to identify your broker.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
Review comment:
This feels a bit ugly, you always have to provide a mirror id here on
'broker conections', even if you aren't doing mirroring? Feels like it should
either be optional, and only enforced present if mirroring is then used, or be
elsewhere such as down a level on a mirroring section. Or not be called a
'mirror-id'.
Typo in "uniqute".
"1 to 255" might be simpler than "(less than 255, more than 0)"
##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an
`<amqp-connection>` element within t
*Notice:* If you disable auto-start on the broker connection, the start of the
broker connection will only happen after the management method
`startBrokerConnection(connectionName)` is called on the ServerController.
-*Important*: The target endpoint needs permission for all operations that you
configure. Therefore, If you are using a security manager, ensure that you
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs
permission for all operations that you configure. Therefore, If you are using a
security manager, ensure that you perform the configured operations as a user
with sufficient permissions.
# AMQP Server Connection Operations
The following types of operations are supported on a AMQP server connection:
+* Mirrors
+ * The broker uses an AMQP connection to another broker and duplicate
messages and sends acknowledgements over the wire.
* Senders
* Messages received on specific queues are transferred to another endpoint
* Receivers
* The broker pulls messages from another endpoint
* Peers
* The broker creates both senders and receivers on another endpoint that
knows how to handle them. Currently, this is implemented by Apache Qpid
Dispatch.
-* Mirrors
- * The broker uses an AMQP connection to another broker and duplicate
messages and sends acknowledgements over the wire.
+## Mirrors
+The mirror option on the broker connection can capture events from the broker
and pass them over the wire to another broker. This enables you to capture
multiple asynchronous replicas. The following types of events are captured:
+
+* Message routing
+* Message acknowledgement
+* Queue and address creation
+* queue and address deletion
Review comment:
Inconsistent capitalisation
##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an
`<amqp-connection>` element within t
*Notice:* If you disable auto-start on the broker connection, the start of the
broker connection will only happen after the management method
`startBrokerConnection(connectionName)` is called on the ServerController.
-*Important*: The target endpoint needs permission for all operations that you
configure. Therefore, If you are using a security manager, ensure that you
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs
permission for all operations that you configure. Therefore, If you are using a
security manager, ensure that you perform the configured operations as a user
with sufficient permissions.
# AMQP Server Connection Operations
The following types of operations are supported on a AMQP server connection:
+* Mirrors
+ * The broker uses an AMQP connection to another broker and duplicate
messages and sends acknowledgements over the wire.
* Senders
* Messages received on specific queues are transferred to another endpoint
* Receivers
* The broker pulls messages from another endpoint
* Peers
* The broker creates both senders and receivers on another endpoint that
knows how to handle them. Currently, this is implemented by Apache Qpid
Dispatch.
-* Mirrors
- * The broker uses an AMQP connection to another broker and duplicate
messages and sends acknowledgements over the wire.
+## Mirrors
+The mirror option on the broker connection can capture events from the broker
and pass them over the wire to another broker. This enables you to capture
multiple asynchronous replicas. The following types of events are captured:
+
+* Message routing
+* Message acknowledgement
+* Queue and address creation
+* queue and address deletion
+
+When you configure a mirror, these events are captured from the broker, stored
on a local queue, and later forwarded to a target destination on another
ActiveMQ Artemis broker.
+
+To configure a mirror, you add a `<mirror>` element within the
`<amqp-connection>` element.
+
+The local queue is called `source-mirror-address`
+
+You can specify the following optional arguments.
+
+* `queue-removal`: Specifies whether a queue- or address-removal event is
sent. The default value is `true`.
+* `message-acknowledgements`: Specifies whether message acknowledgements are
sent. The default value is `true`.
+* `queue-creation`: Specifies whether a queue- or address-creation event is
sent. The default value is `true`.
+* `source-mirror-address`: By default, the mirror creates a non-durable
temporary queue to store messages before they are sent to the other broker. If
you define a name value for this property, an ANYCAST durable queue and address
is created with the specified name.
+
+An example of a mirror configuration is shown below:
+```xml
+<broker-connections source-mirror-id="1">
+ <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-mirror">
+ <mirror queue-removal="true" queue-creation="true"
message-acknowledgements="true" source-mirror-address="myLocalSNFMirrorQueue"/>
+ <mirror target-mirror-id="2"/>
+ </amqp-connection>
+</broker-connections>
+```
+
+*Important*: A broker can mirror to multiple replicas (1 to many). However a
replica broker can only have a single mirror source. Make sure you do not
mirror multiple source brokers to a single replica broker.
+
+### Pre Existing Messages
+The broker will not send pre existing messages through the mirror. So, If you
add mirror to your configuration and the journal had pre existing messages
these messages will not be sent.
+
+## Broker Connection Stop and Disconnect
+Once you start the broker connection with a mirror the mirror events will
always be sent to the intermediate queue configured at the
`source-mirror-address`.
+
+It is possible to stop the broker connection with the operation
stopBrokerConnection(connectionName) on the ServerControl, but it is only
effective to disconnect the brokers, while the mirror events are always
captured.
+
+## Disaster & Recovery Considerations
+As you use the mirror option to replicate data across datacenters, you have to
take a few considerations:
+
+* Currently we don't support quorums for activating the replica, so you have
to manually control when your clients connect to the replica site.
+* Make sure the replica site is passive. Having producers and consumers
connected into both sites would be messy and could lead you to data integrity
issues.
+ * You can disable auto-start on the acceptor your clients use to connect,
and only enable it after a disaster has occurred.
+* Only the queues and addresses are mirrored. Consumer states will have to be
reapplied on the replica when the clients reconnects (that applies to message
groups, exclusive consumers or anything related to clients)
+* Make sure your configuration options are copied over, including Diverts,
security, last value queues, address settings and other configuration options.
+* Have a way back route after a disaster.
+ * You can have a disabled broker connection to be enabled after the
disaster.
+
+
+## Mirror example with Failback
+On this example lets play with two brokers:
+- sourceBroker
+- replicaBroker
+
+Add this configuration on sourceBroker:
+
+```xml
+<broker-connections source-mirror-id="1">
+ <amqp-connection uri="tcp://replicaBroker:6700" name="DRSite">
+ <mirror message-acknowledgements="true"/>
+ <mirror target-mirror-id="2"/>
Review comment:
Mismatched indents, both wrong. Plus same questions as earlier mirror
config comments.
##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an
`<amqp-connection>` element within t
*Notice:* If you disable auto-start on the broker connection, the start of the
broker connection will only happen after the management method
`startBrokerConnection(connectionName)` is called on the ServerController.
-*Important*: The target endpoint needs permission for all operations that you
configure. Therefore, If you are using a security manager, ensure that you
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs
permission for all operations that you configure. Therefore, If you are using a
security manager, ensure that you perform the configured operations as a user
with sufficient permissions.
# AMQP Server Connection Operations
The following types of operations are supported on a AMQP server connection:
+* Mirrors
+ * The broker uses an AMQP connection to another broker and duplicate
messages and sends acknowledgements over the wire.
* Senders
* Messages received on specific queues are transferred to another endpoint
* Receivers
* The broker pulls messages from another endpoint
* Peers
* The broker creates both senders and receivers on another endpoint that
knows how to handle them. Currently, this is implemented by Apache Qpid
Dispatch.
-* Mirrors
- * The broker uses an AMQP connection to another broker and duplicate
messages and sends acknowledgements over the wire.
+## Mirrors
+The mirror option on the broker connection can capture events from the broker
and pass them over the wire to another broker. This enables you to capture
multiple asynchronous replicas. The following types of events are captured:
+
+* Message routing
+* Message acknowledgement
+* Queue and address creation
+* queue and address deletion
+
+When you configure a mirror, these events are captured from the broker, stored
on a local queue, and later forwarded to a target destination on another
ActiveMQ Artemis broker.
+
+To configure a mirror, you add a `<mirror>` element within the
`<amqp-connection>` element.
+
+The local queue is called `source-mirror-address`
+
+You can specify the following optional arguments.
+
+* `queue-removal`: Specifies whether a queue- or address-removal event is
sent. The default value is `true`.
+* `message-acknowledgements`: Specifies whether message acknowledgements are
sent. The default value is `true`.
+* `queue-creation`: Specifies whether a queue- or address-creation event is
sent. The default value is `true`.
+* `source-mirror-address`: By default, the mirror creates a non-durable
temporary queue to store messages before they are sent to the other broker. If
you define a name value for this property, an ANYCAST durable queue and address
is created with the specified name.
+
+An example of a mirror configuration is shown below:
+```xml
+<broker-connections source-mirror-id="1">
+ <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-mirror">
+ <mirror queue-removal="true" queue-creation="true"
message-acknowledgements="true" source-mirror-address="myLocalSNFMirrorQueue"/>
+ <mirror target-mirror-id="2"/>
+ </amqp-connection>
+</broker-connections>
+```
+
+*Important*: A broker can mirror to multiple replicas (1 to many). However a
replica broker can only have a single mirror source. Make sure you do not
mirror multiple source brokers to a single replica broker.
Review comment:
Is this still true? Isnt that part of what the extra mirror-ID and
horrible ID-munging is about handling?
##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an
`<amqp-connection>` element within t
*Notice:* If you disable auto-start on the broker connection, the start of the
broker connection will only happen after the management method
`startBrokerConnection(connectionName)` is called on the ServerController.
-*Important*: The target endpoint needs permission for all operations that you
configure. Therefore, If you are using a security manager, ensure that you
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs
permission for all operations that you configure. Therefore, If you are using a
security manager, ensure that you perform the configured operations as a user
with sufficient permissions.
# AMQP Server Connection Operations
The following types of operations are supported on a AMQP server connection:
+* Mirrors
+ * The broker uses an AMQP connection to another broker and duplicate
messages and sends acknowledgements over the wire.
* Senders
* Messages received on specific queues are transferred to another endpoint
* Receivers
* The broker pulls messages from another endpoint
* Peers
* The broker creates both senders and receivers on another endpoint that
knows how to handle them. Currently, this is implemented by Apache Qpid
Dispatch.
-* Mirrors
- * The broker uses an AMQP connection to another broker and duplicate
messages and sends acknowledgements over the wire.
+## Mirrors
+The mirror option on the broker connection can capture events from the broker
and pass them over the wire to another broker. This enables you to capture
multiple asynchronous replicas. The following types of events are captured:
+
+* Message routing
+* Message acknowledgement
+* Queue and address creation
+* queue and address deletion
+
+When you configure a mirror, these events are captured from the broker, stored
on a local queue, and later forwarded to a target destination on another
ActiveMQ Artemis broker.
+
+To configure a mirror, you add a `<mirror>` element within the
`<amqp-connection>` element.
+
+The local queue is called `source-mirror-address`
+
+You can specify the following optional arguments.
+
+* `queue-removal`: Specifies whether a queue- or address-removal event is
sent. The default value is `true`.
+* `message-acknowledgements`: Specifies whether message acknowledgements are
sent. The default value is `true`.
+* `queue-creation`: Specifies whether a queue- or address-creation event is
sent. The default value is `true`.
+* `source-mirror-address`: By default, the mirror creates a non-durable
temporary queue to store messages before they are sent to the other broker. If
you define a name value for this property, an ANYCAST durable queue and address
is created with the specified name.
+
+An example of a mirror configuration is shown below:
+```xml
+<broker-connections source-mirror-id="1">
+ <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-mirror">
+ <mirror queue-removal="true" queue-creation="true"
message-acknowledgements="true" source-mirror-address="myLocalSNFMirrorQueue"/>
+ <mirror target-mirror-id="2"/>
Review comment:
As above, is source-mirror-address still configurable?
Also, why are there 2 mirror elements on the same connection? What does that
even mean?
The 'target-mirror-id' option seemingly isnt covered in the doc, or the XSD
changes. Is it actually a thing or can this be removed?
##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -69,11 +167,12 @@ Some examples are shown below.
Using address expressions:
```xml
-<broker-connections>
+<broker-connections source-mirror-id="1">
<amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
<sender address-match="queues.#"/>
<!-- notice the local queues for remotequeues.# need to be created on
this broker -->
<receiver address-match="remotequeues.#"/>
+ <mirror target-mirror-id="2"/>
Review comment:
Why is a bit about senders+recievers, i.e not mirrors, adding mirror
config?
##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an
`<amqp-connection>` element within t
*Notice:* If you disable auto-start on the broker connection, the start of the
broker connection will only happen after the management method
`startBrokerConnection(connectionName)` is called on the ServerController.
-*Important*: The target endpoint needs permission for all operations that you
configure. Therefore, If you are using a security manager, ensure that you
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs
permission for all operations that you configure. Therefore, If you are using a
security manager, ensure that you perform the configured operations as a user
with sufficient permissions.
# AMQP Server Connection Operations
The following types of operations are supported on a AMQP server connection:
+* Mirrors
+ * The broker uses an AMQP connection to another broker and duplicate
messages and sends acknowledgements over the wire.
* Senders
* Messages received on specific queues are transferred to another endpoint
* Receivers
* The broker pulls messages from another endpoint
* Peers
* The broker creates both senders and receivers on another endpoint that
knows how to handle them. Currently, this is implemented by Apache Qpid
Dispatch.
-* Mirrors
- * The broker uses an AMQP connection to another broker and duplicate
messages and sends acknowledgements over the wire.
+## Mirrors
+The mirror option on the broker connection can capture events from the broker
and pass them over the wire to another broker. This enables you to capture
multiple asynchronous replicas. The following types of events are captured:
+
+* Message routing
+* Message acknowledgement
+* Queue and address creation
+* queue and address deletion
+
+When you configure a mirror, these events are captured from the broker, stored
on a local queue, and later forwarded to a target destination on another
ActiveMQ Artemis broker.
+
+To configure a mirror, you add a `<mirror>` element within the
`<amqp-connection>` element.
+
+The local queue is called `source-mirror-address`
+
+You can specify the following optional arguments.
+
+* `queue-removal`: Specifies whether a queue- or address-removal event is
sent. The default value is `true`.
+* `message-acknowledgements`: Specifies whether message acknowledgements are
sent. The default value is `true`.
+* `queue-creation`: Specifies whether a queue- or address-creation event is
sent. The default value is `true`.
+* `source-mirror-address`: By default, the mirror creates a non-durable
temporary queue to store messages before they are sent to the other broker. If
you define a name value for this property, an ANYCAST durable queue and address
is created with the specified name.
+
+An example of a mirror configuration is shown below:
+```xml
+<broker-connections source-mirror-id="1">
+ <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-mirror">
+ <mirror queue-removal="true" queue-creation="true"
message-acknowledgements="true" source-mirror-address="myLocalSNFMirrorQueue"/>
+ <mirror target-mirror-id="2"/>
+ </amqp-connection>
+</broker-connections>
+```
+
+*Important*: A broker can mirror to multiple replicas (1 to many). However a
replica broker can only have a single mirror source. Make sure you do not
mirror multiple source brokers to a single replica broker.
+
+### Pre Existing Messages
+The broker will not send pre existing messages through the mirror. So, If you
add mirror to your configuration and the journal had pre existing messages
these messages will not be sent.
+
+## Broker Connection Stop and Disconnect
+Once you start the broker connection with a mirror the mirror events will
always be sent to the intermediate queue configured at the
`source-mirror-address`.
Review comment:
Another reference to source-mirror-address
##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an
`<amqp-connection>` element within t
*Notice:* If you disable auto-start on the broker connection, the start of the
broker connection will only happen after the management method
`startBrokerConnection(connectionName)` is called on the ServerController.
-*Important*: The target endpoint needs permission for all operations that you
configure. Therefore, If you are using a security manager, ensure that you
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs
permission for all operations that you configure. Therefore, If you are using a
security manager, ensure that you perform the configured operations as a user
with sufficient permissions.
# AMQP Server Connection Operations
The following types of operations are supported on a AMQP server connection:
+* Mirrors
+ * The broker uses an AMQP connection to another broker and duplicate
messages and sends acknowledgements over the wire.
* Senders
* Messages received on specific queues are transferred to another endpoint
* Receivers
* The broker pulls messages from another endpoint
* Peers
* The broker creates both senders and receivers on another endpoint that
knows how to handle them. Currently, this is implemented by Apache Qpid
Dispatch.
-* Mirrors
- * The broker uses an AMQP connection to another broker and duplicate
messages and sends acknowledgements over the wire.
+## Mirrors
+The mirror option on the broker connection can capture events from the broker
and pass them over the wire to another broker. This enables you to capture
multiple asynchronous replicas. The following types of events are captured:
+
+* Message routing
+* Message acknowledgement
+* Queue and address creation
+* queue and address deletion
+
+When you configure a mirror, these events are captured from the broker, stored
on a local queue, and later forwarded to a target destination on another
ActiveMQ Artemis broker.
+
+To configure a mirror, you add a `<mirror>` element within the
`<amqp-connection>` element.
+
+The local queue is called `source-mirror-address`
+
+You can specify the following optional arguments.
+
+* `queue-removal`: Specifies whether a queue- or address-removal event is
sent. The default value is `true`.
+* `message-acknowledgements`: Specifies whether message acknowledgements are
sent. The default value is `true`.
+* `queue-creation`: Specifies whether a queue- or address-creation event is
sent. The default value is `true`.
+* `source-mirror-address`: By default, the mirror creates a non-durable
temporary queue to store messages before they are sent to the other broker. If
you define a name value for this property, an ANYCAST durable queue and address
is created with the specified name.
+
+An example of a mirror configuration is shown below:
+```xml
+<broker-connections source-mirror-id="1">
+ <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-mirror">
+ <mirror queue-removal="true" queue-creation="true"
message-acknowledgements="true" source-mirror-address="myLocalSNFMirrorQueue"/>
+ <mirror target-mirror-id="2"/>
+ </amqp-connection>
+</broker-connections>
+```
+
+*Important*: A broker can mirror to multiple replicas (1 to many). However a
replica broker can only have a single mirror source. Make sure you do not
mirror multiple source brokers to a single replica broker.
+
+### Pre Existing Messages
+The broker will not send pre existing messages through the mirror. So, If you
add mirror to your configuration and the journal had pre existing messages
these messages will not be sent.
+
+## Broker Connection Stop and Disconnect
+Once you start the broker connection with a mirror the mirror events will
always be sent to the intermediate queue configured at the
`source-mirror-address`.
+
+It is possible to stop the broker connection with the operation
stopBrokerConnection(connectionName) on the ServerControl, but it is only
effective to disconnect the brokers, while the mirror events are always
captured.
+
+## Disaster & Recovery Considerations
+As you use the mirror option to replicate data across datacenters, you have to
take a few considerations:
+
+* Currently we don't support quorums for activating the replica, so you have
to manually control when your clients connect to the replica site.
+* Make sure the replica site is passive. Having producers and consumers
connected into both sites would be messy and could lead you to data integrity
issues.
+ * You can disable auto-start on the acceptor your clients use to connect,
and only enable it after a disaster has occurred.
+* Only the queues and addresses are mirrored. Consumer states will have to be
reapplied on the replica when the clients reconnects (that applies to message
groups, exclusive consumers or anything related to clients)
+* Make sure your configuration options are copied over, including Diverts,
security, last value queues, address settings and other configuration options.
+* Have a way back route after a disaster.
+ * You can have a disabled broker connection to be enabled after the
disaster.
+
+
+## Mirror example with Failback
+On this example lets play with two brokers:
+- sourceBroker
+- replicaBroker
+
+Add this configuration on sourceBroker:
+
+```xml
+<broker-connections source-mirror-id="1">
+ <amqp-connection uri="tcp://replicaBroker:6700" name="DRSite">
+ <mirror message-acknowledgements="true"/>
+ <mirror target-mirror-id="2"/>
+ </amqp-connection>
+</broker-connections>
+```
+
+On the replicaBroker, add a disabled broker connection for failing back after
a disaster, and also set the acceptors with autoStart=false
+
+```xml
+
+<acceptors>
+ <!-- this one is for clients -->
+ <acceptor
name="artemis">tcp://0.0.0.0:61616?autoStart=false;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;autoStart=false</acceptor>
+ <!-- this one is for DR communication -->
+ <acceptor
name="amqp">tcp://0.0.0.0:6700?autoStart=true;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;autoStart=false</acceptor>
Review comment:
The autoStart option is defined twice in both acceptors. In the second
they actually conflict, one true and the other false.
Seems like it would generally be good to simplify this to the config of
actual interest. E.g drop protocols not used or add a placeholder. Drop
unrelated config that should be defaulting to the same values and few people
should ever touch, such as...well most of these options really.
##########
File path:
tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/containersupport/TestContainerBase.java
##########
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.containersupport;
+
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
Review comment:
Unused imports
##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -36,20 +39,115 @@ To define an AMQP broker connection, add an
`<amqp-connection>` element within t
*Notice:* If you disable auto-start on the broker connection, the start of the
broker connection will only happen after the management method
`startBrokerConnection(connectionName)` is called on the ServerController.
-*Important*: The target endpoint needs permission for all operations that you
configure. Therefore, If you are using a security manager, ensure that you
perform the configured operations as a user with sufficient permissions.
+*Important*: In addition to a matching mirror ID, the target endpoint needs
permission for all operations that you configure. Therefore, If you are using a
security manager, ensure that you perform the configured operations as a user
with sufficient permissions.
# AMQP Server Connection Operations
The following types of operations are supported on a AMQP server connection:
+* Mirrors
+ * The broker uses an AMQP connection to another broker and duplicate
messages and sends acknowledgements over the wire.
* Senders
* Messages received on specific queues are transferred to another endpoint
* Receivers
* The broker pulls messages from another endpoint
* Peers
* The broker creates both senders and receivers on another endpoint that
knows how to handle them. Currently, this is implemented by Apache Qpid
Dispatch.
-* Mirrors
- * The broker uses an AMQP connection to another broker and duplicate
messages and sends acknowledgements over the wire.
+## Mirrors
+The mirror option on the broker connection can capture events from the broker
and pass them over the wire to another broker. This enables you to capture
multiple asynchronous replicas. The following types of events are captured:
+
+* Message routing
+* Message acknowledgement
+* Queue and address creation
+* queue and address deletion
+
+When you configure a mirror, these events are captured from the broker, stored
on a local queue, and later forwarded to a target destination on another
ActiveMQ Artemis broker.
+
+To configure a mirror, you add a `<mirror>` element within the
`<amqp-connection>` element.
+
+The local queue is called `source-mirror-address`
+
+You can specify the following optional arguments.
+
+* `queue-removal`: Specifies whether a queue- or address-removal event is
sent. The default value is `true`.
+* `message-acknowledgements`: Specifies whether message acknowledgements are
sent. The default value is `true`.
+* `queue-creation`: Specifies whether a queue- or address-creation event is
sent. The default value is `true`.
+* `source-mirror-address`: By default, the mirror creates a non-durable
temporary queue to store messages before they are sent to the other broker. If
you define a name value for this property, an ANYCAST durable queue and address
is created with the specified name.
+
+An example of a mirror configuration is shown below:
+```xml
+<broker-connections source-mirror-id="1">
+ <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-mirror">
+ <mirror queue-removal="true" queue-creation="true"
message-acknowledgements="true" source-mirror-address="myLocalSNFMirrorQueue"/>
+ <mirror target-mirror-id="2"/>
+ </amqp-connection>
+</broker-connections>
+```
+
+*Important*: A broker can mirror to multiple replicas (1 to many). However a
replica broker can only have a single mirror source. Make sure you do not
mirror multiple source brokers to a single replica broker.
+
+### Pre Existing Messages
+The broker will not send pre existing messages through the mirror. So, If you
add mirror to your configuration and the journal had pre existing messages
these messages will not be sent.
+
+## Broker Connection Stop and Disconnect
+Once you start the broker connection with a mirror the mirror events will
always be sent to the intermediate queue configured at the
`source-mirror-address`.
+
+It is possible to stop the broker connection with the operation
stopBrokerConnection(connectionName) on the ServerControl, but it is only
effective to disconnect the brokers, while the mirror events are always
captured.
+
+## Disaster & Recovery Considerations
+As you use the mirror option to replicate data across datacenters, you have to
take a few considerations:
+
+* Currently we don't support quorums for activating the replica, so you have
to manually control when your clients connect to the replica site.
+* Make sure the replica site is passive. Having producers and consumers
connected into both sites would be messy and could lead you to data integrity
issues.
+ * You can disable auto-start on the acceptor your clients use to connect,
and only enable it after a disaster has occurred.
+* Only the queues and addresses are mirrored. Consumer states will have to be
reapplied on the replica when the clients reconnects (that applies to message
groups, exclusive consumers or anything related to clients)
+* Make sure your configuration options are copied over, including Diverts,
security, last value queues, address settings and other configuration options.
+* Have a way back route after a disaster.
+ * You can have a disabled broker connection to be enabled after the
disaster.
+
+
+## Mirror example with Failback
+On this example lets play with two brokers:
+- sourceBroker
+- replicaBroker
+
+Add this configuration on sourceBroker:
+
+```xml
+<broker-connections source-mirror-id="1">
+ <amqp-connection uri="tcp://replicaBroker:6700" name="DRSite">
+ <mirror message-acknowledgements="true"/>
+ <mirror target-mirror-id="2"/>
+ </amqp-connection>
+</broker-connections>
+```
+
+On the replicaBroker, add a disabled broker connection for failing back after
a disaster, and also set the acceptors with autoStart=false
+
+```xml
+
+<acceptors>
+ <!-- this one is for clients -->
+ <acceptor
name="artemis">tcp://0.0.0.0:61616?autoStart=false;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;autoStart=false</acceptor>
+ <!-- this one is for DR communication -->
+ <acceptor
name="amqp">tcp://0.0.0.0:6700?autoStart=true;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;autoStart=false</acceptor>
+</acceptors>
+<broker-connections source-mirror-id="2">
+ <amqp-connection uri="tcp://sourceBroker:6700" name="failbackBroker"
auto-start="false">
+ <mirror message-acknowledgements="true"/>
+ <mirror target-mirror-id="1"/>
Review comment:
Indent is wrong. Plus same questions as earlier mirror config comments.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
##########
@@ -202,8 +241,35 @@ private static Properties getProperties(Message message) {
@Override
public void postAcknowledge(MessageReference ref, AckReason reason) throws
Exception {
+
+ MirrorController targetController = getControllerTarget();
+
+ if (targetController != null || ref.getQueue() != null &&
(ref.getQueue().isInternalQueue() || ref.getQueue().isMirrorController())) {
+ if (logger.isTraceEnabled()) {
+ logger.trace(server + " rejecting postAcknowledge queue=" +
ref.getQueue().getName() + ", ref=" + ref + " to avoid infinite loop with the
mirror (reflection)");
+ }
+ return;
+ }
+
+ if (logger.isTraceEnabled()) {
+ logger.trace(server + " postAcknowledge " + ref);
+ }
+
if (acks && !ref.getQueue().isMirrorController()) { // we don't call
postACK on snfqueues, otherwise we would get infinite loop because of this
feedback
- Message message = createMessage(ref.getQueue().getAddress(),
ref.getQueue().getName(), POST_ACK, ref.getMessage().getMessageID());
+ Long internalIDObject =
(Long)ref.getMessage().getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
+ long internalID;
+ if (internalIDObject == null) {
+ internalID = ByteUtil.mixByteAndLong(localMirrorId,
ref.getMessageID());
+ } else {
+ internalID = internalIDObject.longValue();
+ if (logger.isTraceEnabled()) {
+ logger.trace("server " + server + " acking message " + ref);
+ }
+ }
+ if (logger.isTraceEnabled()) {
+ logger.trace(server + " sending ack message from server " +
ByteUtil.getFirstByte(internalID) + " with messageID=" +
ByteUtil.removeFirstByte(internalID));
+ }
Review comment:
I continue to think this is a bad idea.
In this bit of code we munge together two values to form one (perhaps
dropping information from the original ID silently while doing so) unless that
was already done before...then either way we take that value and un-munge it in
order to log the two components. Then we send it over the wire, where the other
side will unmunge it and hopefully come up with 2 values it understands, and
end up doing a whole bunch of similar hoop jumping with them/it.
Meanwhile, on sending a regular message for delivery we use 2 entirely
separate delivery annotations to carry the pristine values without any need to
jump through hoops or possibly unwittingly throw away information from the ID.
Always transmitting the values separately and simplifying the whole situation
just seems like a far better way to go.
##########
File path:
tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/containersupport/ContainerService.java
##########
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.containersupport;
+
+import javax.jms.ConnectionFactory;
+
+import java.io.File;
+import java.util.function.Consumer;
+
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.Assert;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.OutputFrame;
+import
org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+
+public abstract class ContainerService {
+
+ public static final ContainerService service;
+
+ static {
+ ContainerService loadingService;
+ try {
+ String providerName =
System.getProperty(ContainerService.class.getName() + ".service");
+ if (providerName == null) {
+ loadingService = new TestContainerImpl();
+ } else {
+ loadingService = (ContainerService)
Class.forName(providerName).newInstance();
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ loadingService = null;
+ }
+
+ service = loadingService;
+ }
+
+ public static ContainerService getService() {
+ return service;
+ }
+
+ public abstract Object newNetwork();
+
+ public abstract Object newBrokerImage();
+
+ public abstract Object newInterconnectImage();
+
+ public abstract void setNetwork(Object container, Object network);
+
+ public abstract void exposePorts(Object container, Integer... ports);
+
+ public abstract void exposeFile(Object container, String hostPath, String
containerPath);
+
+ public abstract void exposeFolder(Object container, String hostPath, String
containerPath);
+
+ public abstract void copyFileToContainer(Object container, String hostPath,
String containerPath);
+
+ public abstract void exposeBrokerHome(Object container, String brokerHome);
+
+ public abstract void startLogging(Object container, String prefix);
+
+ public abstract void start(Object container);
+
+ public abstract void kill(Object container);
+
+ public abstract void stop(Object container);
+
+ public abstract void restart(Object container);
+
+ public abstract int getPort(Object container, int mappedPort);
+
+ public abstract void exposeHosts(Object container, String... hosts);
+
+ public String getHost(Object container) {
+ return "localhost";
+ }
+
+ public abstract ConnectionFactory createCF(Object container, String
protocol);
+
+ public abstract String createURI(Object container, int port);
+
+ public abstract ConnectionFactory createCF(Object container, String
protocol, int port);
+ public abstract ConnectionFactory createCF(Object container, String
protocol, int port, String extraURL);
+
+ public boolean waitForServerToStart(Object container, String username,
String password, long timeout) throws InterruptedException {
+ long realTimeout = System.currentTimeMillis() + timeout;
+ while (System.currentTimeMillis() < realTimeout) {
+ try {
+ ConnectionFactory cf = createCF(container, "core");
+ cf.createConnection(username, password).close();
+ if (cf instanceof AutoCloseable) {
+ ((AutoCloseable)cf).close();
+ }
+ System.out.println("server started");
+ } catch (Exception e) {
+ System.out.println("awaiting server start at ");
+ Thread.sleep(500);
+ continue;
+ }
+ return true;
+ }
+
+ return false;
+ }
+
+ public abstract void logWait(Object container, String log);
+
+ private static class TestContainerImpl extends ContainerService {
+
+ @Override
+ public ConnectionFactory createCF(Object container, String protocol) {
+ return createCF(container, protocol, 61616);
+ }
+
+
+ @Override
+ public ConnectionFactory createCF(Object container, String protocol, int
port) {
+ return CFUtil.createConnectionFactory("amqp", "tcp://" +
getHost(container) + ":" + getPort(container, port));
+ }
+ @Override
+ public ConnectionFactory createCF(Object container, String protocol, int
port, String extraURI) {
+ System.out.println("tcp://" + getHost(container) + ":" +
getPort(container, port) + extraURI);
+ return CFUtil.createConnectionFactory("amqp", "tcp://" +
getHost(container) + ":" + getPort(container, port) + extraURI);
+ }
+
+ @Override
+ public Object newNetwork() {
+ return Network.newNetwork();
+ }
+
+ @Override
+ public Object newBrokerImage() {
+ return new
GenericContainer<>(DockerImageName.parse("artemis-centos"));
+ }
+
+ @Override
+ public Object newInterconnectImage() {
+ return new
GenericContainer<>(DockerImageName.parse("quay.io/interconnectedcloud/qdrouterd:latest"));
+ }
+
+ @Override
+ public void setNetwork(Object container, Object network) {
+ ((GenericContainer)container).setNetwork((Network)network);
+ }
+
+ @Override
+ public void exposePorts(Object container, Integer... ports) {
+ ((GenericContainer)container).withExposedPorts(ports);
+ }
+
+ @Override
+ public void exposeFile(Object container, String hostPath, String
containerPath) {
+ File file = new File(hostPath);
+ Assert.assertTrue(file.exists());
+ Assert.assertFalse(file.isDirectory());
+ ((GenericContainer)container).withFileSystemBind(hostPath,
containerPath);
+ }
+
+ @Override
+ public void exposeFolder(Object container, String hostPath, String
containerPath) {
+ File file = new File(hostPath);
+ Assert.assertTrue(file.exists());
+ Assert.assertTrue(file.isDirectory());
+ ((GenericContainer)container).withFileSystemBind(hostPath,
containerPath);
+ }
+
+ @Override
+ public void copyFileToContainer(Object container, String hostPath,
String containerPath) {
+ File file = new File(hostPath);
+ Assert.assertTrue(file.exists());
+ Assert.assertFalse(file.isDirectory());
+
((GenericContainer)container).withCopyFileToContainer(MountableFile.forHostPath(hostPath),
containerPath);
+ }
+
+ @Override
+ public void exposeBrokerHome(Object container, String brokerHome) {
+ exposeFolder(container, brokerHome, "/var/lib/artemis-instance");
+ }
+
+ @Override
+ public void start(Object container) {
+ ((GenericContainer)container).setStartupCheckStrategy(new
IsRunningStartupCheckStrategy());
+ ((GenericContainer)container).start();
+ }
+
+ @Override
+ public void restart(Object containerObj) {
+ kill(containerObj);
+ start(containerObj);
+ }
+
+ @Override
+ public void kill(Object containerObj) {
+ GenericContainer container = (GenericContainer) containerObj;
+
container.getDockerClient().killContainerCmd(container.getContainerId()).exec();
+ container.stop();
+ }
+
+ @Override
+ public int getPort(Object container, int mappedPort) {
+ return ((GenericContainer)container).getMappedPort(mappedPort);
+ }
+
+ @Override
+ public void exposeHosts(Object container, String... hosts) {
+ ((GenericContainer)container).withNetworkAliases(hosts);
+ }
+
+ @Override
+ public void stop(Object container) {
+ if (container != null) {
+ ((GenericContainer) container).stop();
+ }
+ }
+
+ @Override
+ public void startLogging(Object container, String prefix) {
+ ((GenericContainer)container).withLogConsumer(new
Consumer<OutputFrame>() {
+ @Override
+ public void accept(OutputFrame outputFrame) {
+ System.out.print(prefix + outputFrame.getUtf8String());
+ }
+ });
+ }
+
+ @Override
+ public void logWait(Object container, String log) {
+ LogMessageWaitStrategy logMessageWaitStrategy = new
LogMessageWaitStrategy();
+ logMessageWaitStrategy.withRegEx(log);
+ ((GenericContainer)container).setWaitStrategy(logMessageWaitStrategy);
+ }
+
+ public String createURI(Object container, int port) {
Review comment:
Missing override
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 616597)
Time Spent: 2h 20m (was: 2h 10m)
> Enhance AMQP Mirror support with dual mirror
> --------------------------------------------
>
> Key: ARTEMIS-3243
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3243
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Affects Versions: 2.17.0
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Fix For: 2.18.0
>
> Time Spent: 2h 20m
> Remaining Estimate: 0h
>
> at the current Mirror version, we can only mirror into a single direction.
> With this enhancement the two (or more brokers) would be connected to each
> other, each one having its own ID, and each one would send updates to the
> other broker.
> The outcome is that if you just transferred producers and consumers from one
> broker into the other, the fallback would be automatic and simple. No need to
> disable and enable mirror options.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)