gemmellr commented on code in PR #5220:
URL: https://github.com/apache/activemq-artemis/pull/5220#discussion_r1863334721
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -543,6 +598,17 @@ public void preAcknowledge(final Transaction tx, final
MessageReference ref, fin
logger.trace("preAcknowledge::tx={}, ref={}, reason={}", tx, ref,
reason);
}
+ SimpleString noForwardSource = null;
+ if
(Boolean.TRUE.equals(ref.getMessage().getBooleanProperty(INTERNAL_NO_FORWARD)))
{
+ noForwardSource = (SimpleString)
ref.getMessage().getBrokerProperty(NO_FORWARD_SOURCE);
+ String remoteMirrorId = getRemoteMirrorId();
+ if (remoteMirrorId != null) {
+ if (!SimpleString.of(remoteMirrorId).equals(noForwardSource)) {
+ return;
+ }
+ }
Review Comment:
Probably worth a comment why this is being checked, but ignored if we cant
tell the remote mirror, and a trace log when returning because we can and its
not what we want.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -578,6 +644,9 @@ public void preAcknowledge(final Transaction tx, final
MessageReference ref, fin
String nodeID = idSupplier.getServerID(ref); // notice the brokerID will
be null for any message generated on this broker.
long internalID = idSupplier.getID(ref);
Message messageCommand = createMessage(ref.getQueue().getAddress(),
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+ if (noForwardSource != null) {
+ messageCommand.setBrokerProperty(RECEIVER_ID_FILTER, noForwardSource);
+ }
Review Comment:
This is always adding it, but we only really need it in the case the mirror
wasnt currently present / identifiable. May be worth only setting it when it
was necessary?
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -467,6 +500,28 @@ public static void
validateProtocolData(ReferenceIDSupplier referenceIDSupplier,
}
}
+ /**
+ * Checks if the message ref should be filtered or not.
+ * @param ref the message to check
+ * @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the
message is set to a different value
+ * than the remoteMirrorID, false otherwise.
+ */
+ public boolean filterMessage(MessageReference ref) {
Review Comment:
Whilst this may stop the message actually being sent, its not clear to me it
does anything around actually cleaning it up? As far as the broker is concerned
it sent a message, but the other end can never acknowledge it since it never
actually arrives as it wasnt sent...so is it just stuck in the structure as a
delivered-but-not-acked message? Presumably the snf queue delivering metrics
would show this (and and be forever incorrect) if so.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -338,6 +364,11 @@ Message copyMessageForPaging(Message message) {
public void sendMessage(Transaction tx, Message message, RoutingContext
context) {
SimpleString address = context.getAddress(message);
+ if (isBlockedByNoForward(message)) {
+ logger.trace("sendMessage::server {} is discarding the message
because its source is setting a noForward policy", server);
Review Comment:
Would replace "discarding the message" with "discarding send" like the other
logging below, both for consistency, and because "discarding the message" might
imply things that cause concern but aren't actually true (i.e the actual
message will be just fine and isnt being discarded)
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -77,29 +78,39 @@
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD_SOURCE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.TARGET_QUEUES;
import static
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT;
import static
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
-public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver
implements MirrorController {
+public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver
implements TargetMirrorController {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL
= new ThreadLocal<>();
+ private static final ThreadLocal<TargetMirrorController>
CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();
- public static void setControllerInUse(MirrorController controller) {
+ public static void setControllerInUse(TargetMirrorController controller) {
CONTROLLER_THREAD_LOCAL.set(controller);
}
- public static MirrorController getControllerInUse() {
+ public static TargetMirrorController getControllerInUse() {
return CONTROLLER_THREAD_LOCAL.get();
}
+ private boolean noMessageForwarding = false;
Review Comment:
Probably worth removing "Message" from the name; if the settings means it
isnt forwarding _any_ of the commands, the field name shouldnt imply it is only
affecting messages.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -70,6 +70,8 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
public static final Symbol QUEUE = Symbol.getSymbol("x-opt-amq-mr-qu");
public static final Symbol BROKER_ID = Symbol.getSymbol("x-opt-amq-bkr-id");
public static final SimpleString BROKER_ID_SIMPLE_STRING =
SimpleString.of(BROKER_ID.toString());
+ public static final SimpleString NO_FORWARD_SOURCE =
SimpleString.of("x-opt-amq-no-forward-source");
+ public static final SimpleString RECEIVER_ID_FILTER =
SimpleString.of("x-opt-amq-receiver-id-filter");
Review Comment:
As mirroring-specific message/delivery annotation keys these should probably
start "x-opt-amq-**mr-**". Since they can go on the wire might be good to
abbreviate where its easy, e.g no-fwd-src and rcv-id-filter
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact