gemmellr commented on code in PR #5220: URL: https://github.com/apache/activemq-artemis/pull/5220#discussion_r1775020313
########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class AMQPSquareMirroringTest extends AmqpClientTestSupport { + + protected static final int AMQP_PORT_2 = 5673; + protected static final int AMQP_PORT_3 = 5674; + protected static final int AMQP_PORT_4 = 5675; + + ActiveMQServer server_2; + ActiveMQServer server_3; + ActiveMQServer server_4; + + @Override + protected ActiveMQServer createServer() throws Exception { + return createServer(AMQP_PORT, false); + } + + protected String getConfiguredProtocols() { + return "AMQP,CORE,OPENWIRE"; + } + + @Test + public void testSquare() throws Exception { + server_2 = createServer(AMQP_PORT_2, false); + server_3 = createServer(AMQP_PORT_3, false); + server_4 = createServer(AMQP_PORT_4, false); Review Comment: I'd also try adding some testing using the ProtonJ2 test peers, rather than all brokers, to validate and exercise the actual broker behaviour at the protocol level. It gives a better accounting of what is really going on and when it changes unexpectedly. E.g it seems all but certain these brokers are currently forwarding around mirrored acks for noForward messages they didn't originally mirror to begin with, but this test will not notice that one way or the other. Similarly with various other potentially unexpected behaviours that might occur. ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java: ########## @@ -65,10 +65,14 @@ default void reloadAddressInfo(AddressInfo addressInfo) throws Exception { AddressInfo removeAddressInfo(SimpleString address, boolean force) throws Exception; + AddressInfo removeAddressInfo(SimpleString address, boolean force, boolean noForward) throws Exception; Review Comment: I think this would be the first main case of directly exposing mirroring related config/behaviour directly into general APIs of post office and server around addresses and queues. In some ways its the slightly more efficient way to do this, though as adding/removing addresses and queues is usually a relatively less frequent activity than e.g actual message transit I'm not sure that slight efficiency is really a concern here. I'd try out alternatives that dont need these specific APIs. I was more expecting something like, or just actual usage of, the existing mechanism already in place to let the mirroring decide not to send a given mirrored message/command action to given mirrors (i.e not back to themselves), where it tracks [\[1\]](https://github.com/apache/activemq-artemis/blob/9bb63b656f64071f92fb7b7846e6d6c274931f64/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java#L269) that it is operating for an arriving mirrored operation and can then react to that later [\[2\]](https://github.com/apache/activemq-artemis/blob/9bb63b656f64071f92fb7b7846e6d6c274931f64/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java#L270). That or something like the way the RoutingContext has options to influence mirroring behaviour. \[1\] https://github.com/apache/activemq-artemis/blob/9bb63b656f64071f92fb7b7846e6d6c274931f64/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java#L269 \[2\] https://github.com/apache/activemq-artemis/blob/9bb63b656f64071f92fb7b7846e6d6c274931f64/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java#L270 ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java: ########## @@ -248,6 +253,13 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI, this.configuration = server.getConfiguration(); this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier(); mirrorContext = protonSession.getSessionSPI().getSessionContext(); + if (receiver.getRemoteDesiredCapabilities() != null) { + for (Symbol capability : receiver.getRemoteDesiredCapabilities()) { + if (capability == NO_FORWARD) { Review Comment: Best to use e.g NO_FORWARD.equals(capability) even for symbols. Better yet, replace the whole outer if with AmqpSupport.verifyDesiredCapability(Link, Symbol) ########## artemis-server/src/main/resources/schema/artemis-configuration.xsd: ########## @@ -2290,6 +2290,14 @@ </xsd:documentation> </xsd:annotation> </xsd:attribute> + <xsd:attribute name="no-message-forwarding" type="xsd:boolean" use="optional" default="false"> + <xsd:annotation> + <xsd:documentation> + If this is true, the mirror at the opposite end of the link will not forward messages coming from that link to any other mirrors down the line. + This is false by default. + </xsd:documentation> + </xsd:annotation> + </xsd:attribute> Review Comment: With the change to limit command forwarding as well, the option should probably be changed to just "no-forward[ing]", i.e remove the "-messages", and description updated accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact