gemmellr commented on code in PR #5220:
URL: https://github.com/apache/activemq-artemis/pull/5220#discussion_r1863334721


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -543,6 +598,17 @@ public void preAcknowledge(final Transaction tx, final 
MessageReference ref, fin
          logger.trace("preAcknowledge::tx={}, ref={}, reason={}", tx, ref, 
reason);
       }
 
+      SimpleString noForwardSource = null;
+      if 
(Boolean.TRUE.equals(ref.getMessage().getBooleanProperty(INTERNAL_NO_FORWARD))) 
{
+         noForwardSource = (SimpleString) 
ref.getMessage().getBrokerProperty(NO_FORWARD_SOURCE);
+         String remoteMirrorId = getRemoteMirrorId();
+         if (remoteMirrorId != null) {
+            if (!SimpleString.of(remoteMirrorId).equals(noForwardSource)) {
+               return;
+            }
+         }

Review Comment:
   Probably worth a comment why this is being checked, but ignored if we cant 
tell the remote mirror, and a trace log when returning because we can and its 
not what we want.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -578,6 +644,9 @@ public void preAcknowledge(final Transaction tx, final 
MessageReference ref, fin
       String nodeID = idSupplier.getServerID(ref); // notice the brokerID will 
be null for any message generated on this broker.
       long internalID = idSupplier.getID(ref);
       Message messageCommand = createMessage(ref.getQueue().getAddress(), 
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+      if (noForwardSource != null) {
+         messageCommand.setBrokerProperty(RECEIVER_ID_FILTER, noForwardSource);
+      }

Review Comment:
   This is always adding it, but we only really need it in the case the mirror 
wasnt currently present / identifiable. May be worth only setting it when it 
was necessary?



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -467,6 +500,28 @@ public static void 
validateProtocolData(ReferenceIDSupplier referenceIDSupplier,
       }
    }
 
+   /**
+    * Checks if the message ref should be filtered or not.
+    * @param ref the message to check
+    * @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the 
message is set to a different value
+    * than the remoteMirrorID, false otherwise.
+    */
+   public boolean filterMessage(MessageReference ref) {

Review Comment:
   Whilst this may stop the message actually being sent, its not clear to me it 
does anything around actually cleaning it up? As far as the broker is concerned 
it sent a message, but the other end can never acknowledge it since it never 
actually arrives as it wasnt sent...so is it just stuck in the structure as a 
delivered-but-not-acked message? Presumably the snf queue delivering metrics 
would show this (and and be forever incorrect) if so.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -338,6 +364,11 @@ Message copyMessageForPaging(Message message) {
    public void sendMessage(Transaction tx, Message message, RoutingContext 
context) {
       SimpleString address = context.getAddress(message);
 
+      if (isBlockedByNoForward(message)) {
+         logger.trace("sendMessage::server {} is discarding the message 
because its source is setting a noForward policy", server);

Review Comment:
   Would replace "discarding the message" with "discarding send" like the other 
logging below, both for consistency, and because "discarding the message" might 
imply things that cause concern but aren't actually true (i.e the actual 
message will be just fine and isnt being discarded)



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -77,29 +78,39 @@
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD_SOURCE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.TARGET_QUEUES;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
 
-public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver 
implements MirrorController {
+public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver 
implements TargetMirrorController {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-   private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL 
= new ThreadLocal<>();
+   private static final ThreadLocal<TargetMirrorController> 
CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();
 
-   public static void setControllerInUse(MirrorController controller) {
+   public static void setControllerInUse(TargetMirrorController controller) {
       CONTROLLER_THREAD_LOCAL.set(controller);
    }
 
-   public static MirrorController getControllerInUse() {
+   public static TargetMirrorController getControllerInUse() {
       return CONTROLLER_THREAD_LOCAL.get();
    }
 
+   private boolean noMessageForwarding = false;

Review Comment:
   Probably worth removing "Message" from the name; if the settings means it 
isnt forwarding _any_ of the commands, the field name shouldnt imply it is only 
affecting messages.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -70,6 +70,8 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
    public static final Symbol QUEUE = Symbol.getSymbol("x-opt-amq-mr-qu");
    public static final Symbol BROKER_ID = Symbol.getSymbol("x-opt-amq-bkr-id");
    public static final SimpleString BROKER_ID_SIMPLE_STRING = 
SimpleString.of(BROKER_ID.toString());
+   public static final SimpleString NO_FORWARD_SOURCE = 
SimpleString.of("x-opt-amq-no-forward-source");
+   public static final SimpleString RECEIVER_ID_FILTER = 
SimpleString.of("x-opt-amq-receiver-id-filter");

Review Comment:
   As mirroring-specific message/delivery annotation keys these should probably 
start "x-opt-amq-**mr-**". Since they can go on the wire might be good to 
abbreviate where its easy, e.g no-fwd-src and rcv-id-filter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org
For additional commands, e-mail: gitbox-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to