gemmellr commented on code in PR #5220: URL: https://github.com/apache/activemq-artemis/pull/5220#discussion_r1758631836
########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java: ########## @@ -430,24 +431,27 @@ private void doConnect() { final Queue queue = server.locateQueue(getMirrorSNF(replica)); final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica); - final Symbol[] desiredCapabilities; + ArrayList<Symbol> capabilities = new ArrayList<>(); + capabilities.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY); if (coreTunnelingEnabled) { - desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, - AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT}; - } else { - desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY}; + capabilities.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT); + } + if (!replica.getCanForwardMessages()) { + capabilities.add(AMQPMirrorControllerSource.NO_FORWARD); } + final Symbol[] desiredCapabilities = (Symbol[]) capabilities.toArray(new Symbol[]{}); + final Symbol[] requiredOfferedCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY}; connectSender(queue, queue.getName().toString(), mirrorControllerSource::setLink, (r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)), server.getNodeID().toString(), - desiredCapabilities, - null, + desiredCapabilities, Review Comment: Extraneous space added, seems like this line doesnt need to change. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java: ########## @@ -430,24 +431,27 @@ private void doConnect() { final Queue queue = server.locateQueue(getMirrorSNF(replica)); final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica); - final Symbol[] desiredCapabilities; + ArrayList<Symbol> capabilities = new ArrayList<>(); Review Comment: desiredCapabilitiesList might be more obvious for the relationship to the array later ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java: ########## @@ -301,6 +303,10 @@ private boolean invalidTarget(MirrorController controller) { return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId()); } + private boolean isBlockedByNoForward(Message message) { + return Boolean.valueOf(true).equals(message.getBrokerProperty(INTERNAL_NO_FORWARD)); Review Comment: Boolean.TRUE ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java: ########## @@ -265,6 +277,7 @@ protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnota OperationContext oldContext = recoverContext(); incrementSettle(); + Review Comment: superfluous newline addition (some more later too) ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java: ########## @@ -1204,4 +1208,4 @@ private void doCloseConnector() { }); } } -} +} Review Comment: Can leave newlines at end, many editors will so it will probably just end up going back in and flip flop. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java: ########## @@ -534,6 +549,9 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID); message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, internalMirrorID); + if (!this.canForwardMessages) { + message.setBrokerProperty(INTERNAL_NO_FORWARD, true); Review Comment: I wondered if the RoutingContext could be used for the 'dont [re-]mirror this' handling. It already has functionality for doing that (see isMirrorDisabled() and related). However it now also occurs to me, we possibly do need to mark the message like this simply because when that message is acknowledged later, we probably want to know then that it was 'no forward' now, so that we dont mirror acknowledgement for it either given we never mirrored the message itself to begin with. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java: ########## @@ -534,6 +549,9 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID); message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, internalMirrorID); + if (!this.canForwardMessages) { Review Comment: can drop the "this." ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java: ########## @@ -248,6 +253,13 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI, this.configuration = server.getConfiguration(); this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier(); mirrorContext = protonSession.getSessionSPI().getSessionContext(); + if (receiver.getRemoteDesiredCapabilities() != null) { + for (Symbol capability : receiver.getRemoteDesiredCapabilities()) { + if (capability == NO_FORWARD) { + this.canForwardMessages = false; + } Review Comment: As discussed yesterday, this should send back indicating it offers the support, and the origin should check it receives the offer and barf if not. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java: ########## @@ -92,6 +95,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>(); + private boolean canForwardMessages = true; Review Comment: I would flip this to e.g noForwardMessages, or change the option itself, to match the same 'no forward' naming/sense used for the option. Simplifies things considering it all in the same sense, if there isnt reason not to. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java: ########## @@ -779,4 +790,4 @@ public boolean isAlreadyAcked(Queue queue) { -} +} Review Comment: As before ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPChainedReplicaTest.java: ########## @@ -196,4 +196,4 @@ public void testChained() throws Exception { } } -} +} Review Comment: Dont think we need to update this file for this change :) ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java: ########## @@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme boolean queueCreation = true; + boolean canForwardMessages = true; Review Comment: similarly I would keep the field the same sense as the option, plus the XML and config option should also use the same sense so that the defaults are more describable for both the XML and broker-properties usage. Currently someone using broker-properties would need to configure 'canForwardMessages = false' whilst someone using XML would need to configure 'no-messages-forwarding = true' to flip their actually-shared behaviour. ########## artemis-server/src/main/resources/schema/artemis-configuration.xsd: ########## @@ -2290,6 +2290,14 @@ </xsd:documentation> </xsd:annotation> </xsd:attribute> + <xsd:attribute name="no-message-forwarding" type="xsd:boolean" use="optional" default="false"> + <xsd:annotation> + <xsd:documentation> + If this is true, the mirror at the opposite end of the link will not forward messages coming from that link to any other mirrors down the line. + This is false by default. + </xsd:documentation> + </xsd:annotation> + </xsd:attribute> Review Comment: This name+description, plus my earlier note about the acknowledgements, make me realise it is still going to re-mirror other things like address and queue creations, and more importantly deletions. Which could still be quite problematic just like the message-duping themselves, likely worse due to timing interactions of creation and deletion. I'm not sure anything can be done about that except only using pre-created everything. I am again sceptical of this topology. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact