gemmellr commented on code in PR #5220: URL: https://github.com/apache/activemq-artemis/pull/5220#discussion_r1750453295
########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java: ########## @@ -92,6 +92,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.of(INTERNAL_ID.toString()); public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.of(BROKER_ID.toString()); + public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward"); Review Comment: Given this will be specific to the mirroring, I'd wind that into the value so its also specific. This should be moved 4 lines up beside the related main MIRROR_CAPABILITY. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQSquareMirroringTest.java: ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.Test; + +import javax.jms.*; + +import static org.junit.jupiter.api.Assertions.*; + +public class AMQSquareMirroringTest extends AmqpClientTestSupport { + + protected static final int AMQP_PORT_2 = 5673; + protected static final int AMQP_PORT_3 = 5674; + protected static final int AMQP_PORT_4 = 5675; + + ActiveMQServer server_2; + ActiveMQServer server_3; + ActiveMQServer server_4; + + @Override + protected ActiveMQServer createServer() throws Exception { + return createServer(AMQP_PORT, false); + } + + @Test + public void testSquare() throws Exception { + server_2 = createServer(AMQP_PORT_2, false); + server_3 = createServer(AMQP_PORT_3, false); + server_4 = createServer(AMQP_PORT_4, false); + + // name the servers, for convenience during debugging + server.getConfiguration().setName("1"); + server_2.getConfiguration().setName("2"); + server_3.getConfiguration().setName("3"); + server_4.getConfiguration().setName("4"); + + /** + * 1 <----> 2 + * ^ ^ + * | | + * v v + * 4 <----> 3 + */ + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100).setReconnectAttempts(-1); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration("to4", "tcp://localhost:" + AMQP_PORT_4).setRetryInterval(100).setReconnectAttempts(-1); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setForward(false)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100).setReconnectAttempts(-1); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration("to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100).setReconnectAttempts(-1); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100).setReconnectAttempts(-1); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_3.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration("to4", "tcp://localhost:" + AMQP_PORT_4).setRetryInterval(100).setReconnectAttempts(-1); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_3.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100).setReconnectAttempts(-1); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_4.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration("to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100).setReconnectAttempts(-1); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_4.getConfiguration().addAMQPConnection(amqpConnection); + } + + server.start(); + server_2.start(); + server_3.start(); + server_4.start(); + + createAddressAndQueues(server); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_3.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_4.locateQueue(getQueueName()) != null); + + Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + Queue q2 = server.locateQueue(getQueueName()); + assertNotNull(q2); + + Queue q3 = server.locateQueue(getQueueName()); + assertNotNull(q3); + + Queue q4 = server.locateQueue(getQueueName()); + assertNotNull(q4); + + ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT+"?amqp.idleTimeout=-1"); + ConnectionFactory factory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2+"?amqp.idleTimeout=-1"); + ConnectionFactory factory3 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_3+"?amqp.idleTimeout=-1"); + ConnectionFactory factory4 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_4+"?amqp.idleTimeout=-1"); Review Comment: Should probably drop the idleTimeout disabling from the final version. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQSquareMirroringTest.java: ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.Test; + +import javax.jms.*; + +import static org.junit.jupiter.api.Assertions.*; + +public class AMQSquareMirroringTest extends AmqpClientTestSupport { + + protected static final int AMQP_PORT_2 = 5673; + protected static final int AMQP_PORT_3 = 5674; + protected static final int AMQP_PORT_4 = 5675; + + ActiveMQServer server_2; + ActiveMQServer server_3; + ActiveMQServer server_4; + + @Override + protected ActiveMQServer createServer() throws Exception { + return createServer(AMQP_PORT, false); + } + + @Test + public void testSquare() throws Exception { + server_2 = createServer(AMQP_PORT_2, false); + server_3 = createServer(AMQP_PORT_3, false); + server_4 = createServer(AMQP_PORT_4, false); + + // name the servers, for convenience during debugging + server.getConfiguration().setName("1"); + server_2.getConfiguration().setName("2"); + server_3.getConfiguration().setName("3"); + server_4.getConfiguration().setName("4"); + + /** + * 1 <----> 2 + * ^ ^ + * | | + * v v + * 4 <----> 3 + */ + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100).setReconnectAttempts(-1); Review Comment: Rather than just "to2" etc, each of which you use more than once, I'd include both the source and target ID so that every name is different. Personally I'd also use e.g getTestMethodName() value as a prefix to make them even more to the point. We shouldnt use -1 reconnect attempts, if something barfs in a way anything leaks (shouldn't..but stuff that shouldn't happen, often does..) it could sit around reconnecting and interfering with other tests later (which is when the unique names, including the test names, becomes especially helpful). ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java: ########## @@ -264,6 +265,14 @@ public void flow() { protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnotations deliveryAnnotations, Receiver receiver, Transaction tx) { OperationContext oldContext = recoverContext(); incrementSettle(); + boolean noForward = false; + if (receiver.getRemoteDesiredCapabilities() != null) { + for (Symbol capability : receiver.getRemoteDesiredCapabilities()) { + if (capability == AMQPMirrorControllerSource.NO_FORWARD) { + noForward = true; + } + } + } Review Comment: This isnt going to change for the duration of the link, so it makes sense to evaluate it once at the start when initializing the mirror target, rather than determining the same result repeatedly for each arriving message. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java: ########## @@ -512,6 +521,12 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat routingContext.setDuplicateDetection(false); // we do our own duplicate detection here + if (noForward) { Review Comment: Also, this seems to be too early to check this. It cant know whether 'mirror forwarding' will occur until during the routing stages later in this method, which is where mirroring handling occurs. Returning here seems like it would mean it drops all messages, even from _this_ broker they were being mirrored to. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQSquareMirroringTest.java: ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.Test; + +import javax.jms.*; + +import static org.junit.jupiter.api.Assertions.*; + +public class AMQSquareMirroringTest extends AmqpClientTestSupport { Review Comment: All the existing tests in the package use AMQ\*P\*FooTest or just FooTest. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQSquareMirroringTest.java: ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.Test; + +import javax.jms.*; + +import static org.junit.jupiter.api.Assertions.*; Review Comment: Lets add the actual imports rather than * ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java: ########## @@ -75,6 +77,12 @@ public AMQPMirrorBrokerConnectionElement setQueueCreation(boolean queueCreation) return this; } + public boolean isForward () { return forward; } Review Comment: Same. Its not allowed by the checkstyle config anyways. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact