gemmellr commented on code in PR #5220: URL: https://github.com/apache/activemq-artemis/pull/5220#discussion_r1858946267
########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java: ########## @@ -53,6 +53,7 @@ import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.amqp.transport.Target; Review Comment: Think this will be unused, the MirrorSource shouldnt need to be dealing with Target's ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java: ########## @@ -306,10 +307,10 @@ public void validateMatching(Queue queue, AMQPBrokerConnectionElement connection public void createLink(Queue queue, AMQPBrokerConnectionElement connectionElement) { if (connectionElement.getType() == AMQPBrokerConnectionAddressType.PEER) { Symbol[] dispatchCapability = new Symbol[]{AMQPMirrorControllerSource.QPID_DISPATCH_WAYPOINT_CAPABILITY}; - connectSender(queue, queue.getAddress().toString(), null, null, null, null, dispatchCapability, null); + connectSender(queue, queue.getAddress().toString(), null, null,null, null, null, dispatchCapability, null); Review Comment: Should be a space before the new null, like the surroundings. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java: ########## @@ -543,6 +602,17 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin logger.trace("preAcknowledge::tx={}, ref={}, reason={}", tx, ref, reason); } + SimpleString noForwardSource = null; + if (Boolean.TRUE.equals(ref.getMessage().getBooleanProperty(INTERNAL_NO_FORWARD))) { + noForwardSource = (SimpleString) ref.getMessage().getBrokerProperty(INTERNAL_NO_FORWARD_SOURCE); Review Comment: I dont think we should be setting a SimpleString value for this...and now looking at the other uses, it doesnt seem like it will be, its set as a String originally? (Which makes sense as its ultimately coming from an AMQP link property, as a string value). ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java: ########## @@ -467,6 +504,28 @@ public static void validateProtocolData(ReferenceIDSupplier referenceIDSupplier, } } + /** + * Checks if the message ref should be filtered or not. + * @param ref the message to filter + * @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the message is set to a different value + * that the remoteMirrorID, false otherwise. Review Comment: (No need to keep the suggestion commit, its just easier to show the suggested change this way) ```suggestion * @param ref the message to check * @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the message is set to a different value * than the remoteMirrorID, false otherwise. ``` ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java: ########## @@ -338,6 +367,12 @@ Message copyMessageForPaging(Message message) { public void sendMessage(Transaction tx, Message message, RoutingContext context) { SimpleString address = context.getAddress(message); + if (isBlockedByNoForward(message)) { + String remoteID = getRemoteMirrorId(); Review Comment: Variable looks unused, did you mean to add it to the logging? ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java: ########## @@ -349,20 +512,41 @@ public void testProducerMessageIsMirroredWithoutCoreTunnelingUsesDefaultMessageF doTestProducerMessageIsMirroredWithCorrectMessageFormat(false); } + @Test + @Timeout(20) + public void testProducerMessageIsMirroredWithNoForwardAndTunneling() throws Exception { + doTestProducerMessageIsMirroredWithCorrectMessageFormat(true, true); + } + + @Test + @Timeout(20) + public void testProducerMessageIsMirroredWithNoForwardAndTunelingAndWithoutTunneling() throws Exception { Review Comment: testProducerMessageIsMirroredWithNoForwardAndWithoutTunneling() ? ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java: ########## @@ -89,9 +90,15 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im // Capabilities public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror"); public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint"); + public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward"); + public static final Symbol NO_FORWARD_SOURCE = Symbol.getSymbol("amq.no.forward.source"); + public static final Symbol RECEIVER_ID_FILTER = Symbol.getSymbol("amq.receiver.id.filter"); Review Comment: NO_FORWARD_SOURCE and RECEIVER_ID_FILTER arent being used as a capability, so shouldn't be in this group of constants that are for capabilities. They also likely don't need to be of Symbol type as a result. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact