gemmellr commented on code in PR #5220: URL: https://github.com/apache/activemq-artemis/pull/5220#discussion_r1863334721
########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java: ########## @@ -543,6 +598,17 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin logger.trace("preAcknowledge::tx={}, ref={}, reason={}", tx, ref, reason); } + SimpleString noForwardSource = null; + if (Boolean.TRUE.equals(ref.getMessage().getBooleanProperty(INTERNAL_NO_FORWARD))) { + noForwardSource = (SimpleString) ref.getMessage().getBrokerProperty(NO_FORWARD_SOURCE); + String remoteMirrorId = getRemoteMirrorId(); + if (remoteMirrorId != null) { + if (!SimpleString.of(remoteMirrorId).equals(noForwardSource)) { + return; + } + } Review Comment: Probably worth a comment why this is being checked, but ignored if we cant tell the remote mirror, and a trace log when returning because we can and its not what we want. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java: ########## @@ -578,6 +644,9 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin String nodeID = idSupplier.getServerID(ref); // notice the brokerID will be null for any message generated on this broker. long internalID = idSupplier.getID(ref); Message messageCommand = createMessage(ref.getQueue().getAddress(), ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason); + if (noForwardSource != null) { + messageCommand.setBrokerProperty(RECEIVER_ID_FILTER, noForwardSource); + } Review Comment: This is always adding it, but we only really need it in the case the mirror wasnt currently present / identifiable. May be worth only setting it when it was necessary? ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java: ########## @@ -467,6 +500,28 @@ public static void validateProtocolData(ReferenceIDSupplier referenceIDSupplier, } } + /** + * Checks if the message ref should be filtered or not. + * @param ref the message to check + * @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the message is set to a different value + * than the remoteMirrorID, false otherwise. + */ + public boolean filterMessage(MessageReference ref) { Review Comment: Whilst this may stop the message actually being sent, its not clear to me it does anything around actually cleaning it up? As far as the broker is concerned it sent a message, but the other end can never acknowledge it since it never actually arrives as it wasnt sent...so is it just stuck in the structure as a delivered-but-not-acked message? Presumably the snf queue delivering metrics would show this (and and be forever incorrect) if so. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java: ########## @@ -338,6 +364,11 @@ Message copyMessageForPaging(Message message) { public void sendMessage(Transaction tx, Message message, RoutingContext context) { SimpleString address = context.getAddress(message); + if (isBlockedByNoForward(message)) { + logger.trace("sendMessage::server {} is discarding the message because its source is setting a noForward policy", server); Review Comment: Would replace "discarding the message" with "discarding send" like the other logging below, both for consistency, and because "discarding the message" might imply things that cause concern but aren't actually true (i.e the actual message will be just fine and isnt being discarded) ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java: ########## @@ -77,29 +78,39 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD_SOURCE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.TARGET_QUEUES; import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT; import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT; -public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements MirrorController { +public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements TargetMirrorController { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>(); + private static final ThreadLocal<TargetMirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>(); - public static void setControllerInUse(MirrorController controller) { + public static void setControllerInUse(TargetMirrorController controller) { CONTROLLER_THREAD_LOCAL.set(controller); } - public static MirrorController getControllerInUse() { + public static TargetMirrorController getControllerInUse() { return CONTROLLER_THREAD_LOCAL.get(); } + private boolean noMessageForwarding = false; Review Comment: Probably worth removing "Message" from the name; if the settings means it isnt forwarding _any_ of the commands, the field name shouldnt imply it is only affecting messages. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java: ########## @@ -70,6 +70,8 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im public static final Symbol QUEUE = Symbol.getSymbol("x-opt-amq-mr-qu"); public static final Symbol BROKER_ID = Symbol.getSymbol("x-opt-amq-bkr-id"); public static final SimpleString BROKER_ID_SIMPLE_STRING = SimpleString.of(BROKER_ID.toString()); + public static final SimpleString NO_FORWARD_SOURCE = SimpleString.of("x-opt-amq-no-forward-source"); + public static final SimpleString RECEIVER_ID_FILTER = SimpleString.of("x-opt-amq-receiver-id-filter"); Review Comment: As mirroring-specific message/delivery annotation keys these should probably start "x-opt-amq-**mr-**". Since they can go on the wire might be good to abbreviate where its easy, e.g no-fwd-src and rcv-id-filter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact