gemmellr commented on code in PR #5220:
URL: https://github.com/apache/activemq-artemis/pull/5220#discussion_r1858946267
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -53,6 +53,7 @@
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.transport.Target;
Review Comment:
Think this will be unused, the MirrorSource shouldnt need to be dealing with
Target's
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java:
##########
@@ -306,10 +307,10 @@ public void validateMatching(Queue queue,
AMQPBrokerConnectionElement connection
public void createLink(Queue queue, AMQPBrokerConnectionElement
connectionElement) {
if (connectionElement.getType() == AMQPBrokerConnectionAddressType.PEER)
{
Symbol[] dispatchCapability = new
Symbol[]{AMQPMirrorControllerSource.QPID_DISPATCH_WAYPOINT_CAPABILITY};
- connectSender(queue, queue.getAddress().toString(), null, null, null,
null, dispatchCapability, null);
+ connectSender(queue, queue.getAddress().toString(), null, null,null,
null, null, dispatchCapability, null);
Review Comment:
Should be a space before the new null, like the surroundings.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -543,6 +602,17 @@ public void preAcknowledge(final Transaction tx, final
MessageReference ref, fin
logger.trace("preAcknowledge::tx={}, ref={}, reason={}", tx, ref,
reason);
}
+ SimpleString noForwardSource = null;
+ if
(Boolean.TRUE.equals(ref.getMessage().getBooleanProperty(INTERNAL_NO_FORWARD)))
{
+ noForwardSource = (SimpleString)
ref.getMessage().getBrokerProperty(INTERNAL_NO_FORWARD_SOURCE);
Review Comment:
I dont think we should be setting a SimpleString value for this...and now
looking at the other uses, it doesnt seem like it will be, its set as a String
originally? (Which makes sense as its ultimately coming from an AMQP link
property, as a string value).
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -467,6 +504,28 @@ public static void
validateProtocolData(ReferenceIDSupplier referenceIDSupplier,
}
}
+ /**
+ * Checks if the message ref should be filtered or not.
+ * @param ref the message to filter
+ * @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the
message is set to a different value
+ * that the remoteMirrorID, false otherwise.
Review Comment:
(No need to keep the suggestion commit, its just easier to show the
suggested change this way)
```suggestion
* @param ref the message to check
* @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the
message is set to a different value
* than the remoteMirrorID, false otherwise.
```
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -338,6 +367,12 @@ Message copyMessageForPaging(Message message) {
public void sendMessage(Transaction tx, Message message, RoutingContext
context) {
SimpleString address = context.getAddress(message);
+ if (isBlockedByNoForward(message)) {
+ String remoteID = getRemoteMirrorId();
Review Comment:
Variable looks unused, did you mean to add it to the logging?
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java:
##########
@@ -349,20 +512,41 @@ public void
testProducerMessageIsMirroredWithoutCoreTunnelingUsesDefaultMessageF
doTestProducerMessageIsMirroredWithCorrectMessageFormat(false);
}
+ @Test
+ @Timeout(20)
+ public void testProducerMessageIsMirroredWithNoForwardAndTunneling() throws
Exception {
+ doTestProducerMessageIsMirroredWithCorrectMessageFormat(true, true);
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testProducerMessageIsMirroredWithNoForwardAndTunelingAndWithoutTunneling()
throws Exception {
Review Comment:
testProducerMessageIsMirroredWithNoForwardAndWithoutTunneling() ?
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -89,9 +90,15 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
// Capabilities
public static final Symbol MIRROR_CAPABILITY =
Symbol.getSymbol("amq.mirror");
public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY =
Symbol.valueOf("qd.waypoint");
+ public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward");
+ public static final Symbol NO_FORWARD_SOURCE =
Symbol.getSymbol("amq.no.forward.source");
+ public static final Symbol RECEIVER_ID_FILTER =
Symbol.getSymbol("amq.receiver.id.filter");
Review Comment:
NO_FORWARD_SOURCE and RECEIVER_ID_FILTER arent being used as a capability,
so shouldn't be in this group of constants that are for capabilities. They also
likely don't need to be of Symbol type as a result.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact