gemmellr commented on code in PR #5220:
URL: https://github.com/apache/activemq-artemis/pull/5220#discussion_r1758631836
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java:
##########
@@ -430,24 +431,27 @@ private void doConnect() {
final Queue queue =
server.locateQueue(getMirrorSNF(replica));
final boolean coreTunnelingEnabled =
isCoreMessageTunnelingEnabled(replica);
- final Symbol[] desiredCapabilities;
+ ArrayList<Symbol> capabilities = new ArrayList<>();
+
capabilities.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY);
if (coreTunnelingEnabled) {
- desiredCapabilities = new Symbol[]
{AMQPMirrorControllerSource.MIRROR_CAPABILITY,
-
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT};
- } else {
- desiredCapabilities = new Symbol[]
{AMQPMirrorControllerSource.MIRROR_CAPABILITY};
+
capabilities.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
+ }
+ if (!replica.getCanForwardMessages()) {
+ capabilities.add(AMQPMirrorControllerSource.NO_FORWARD);
}
+ final Symbol[] desiredCapabilities = (Symbol[])
capabilities.toArray(new Symbol[]{});
+
final Symbol[] requiredOfferedCapabilities = new Symbol[]
{AMQPMirrorControllerSource.MIRROR_CAPABILITY};
connectSender(queue,
queue.getName().toString(),
mirrorControllerSource::setLink,
(r) ->
AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(),
r, getMirrorSNF(replica)),
server.getNodeID().toString(),
- desiredCapabilities,
- null,
+ desiredCapabilities,
Review Comment:
Extraneous space added, seems like this line doesnt need to change.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java:
##########
@@ -430,24 +431,27 @@ private void doConnect() {
final Queue queue =
server.locateQueue(getMirrorSNF(replica));
final boolean coreTunnelingEnabled =
isCoreMessageTunnelingEnabled(replica);
- final Symbol[] desiredCapabilities;
+ ArrayList<Symbol> capabilities = new ArrayList<>();
Review Comment:
desiredCapabilitiesList might be more obvious for the relationship to the
array later
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -301,6 +303,10 @@ private boolean invalidTarget(MirrorController controller)
{
return controller != null && sameNode(getRemoteMirrorId(),
controller.getRemoteMirrorId());
}
+ private boolean isBlockedByNoForward(Message message) {
+ return
Boolean.valueOf(true).equals(message.getBrokerProperty(INTERNAL_NO_FORWARD));
Review Comment:
Boolean.TRUE
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -265,6 +277,7 @@ protected void actualDelivery(Message message, Delivery
delivery, DeliveryAnnota
OperationContext oldContext = recoverContext();
incrementSettle();
+
Review Comment:
superfluous newline addition (some more later too)
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java:
##########
@@ -1204,4 +1208,4 @@ private void doCloseConnector() {
});
}
}
-}
+}
Review Comment:
Can leave newlines at end, many editors will so it will probably just end up
going back in and flip flop.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -534,6 +549,9 @@ private boolean sendMessage(Message message,
DeliveryAnnotations deliveryAnnotat
message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID);
message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY,
internalMirrorID);
+ if (!this.canForwardMessages) {
+ message.setBrokerProperty(INTERNAL_NO_FORWARD, true);
Review Comment:
I wondered if the RoutingContext could be used for the 'dont [re-]mirror
this' handling. It already has functionality for doing that (see
isMirrorDisabled() and related).
However it now also occurs to me, we possibly do need to mark the message
like this simply because when that message is acknowledged later, we probably
want to know then that it was 'no forward' now, so that we dont mirror
acknowledgement for it either given we never mirrored the message itself to
begin with.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -534,6 +549,9 @@ private boolean sendMessage(Message message,
DeliveryAnnotations deliveryAnnotat
message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID);
message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY,
internalMirrorID);
+ if (!this.canForwardMessages) {
Review Comment:
can drop the "this."
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -248,6 +253,13 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback
sessionSPI,
this.configuration = server.getConfiguration();
this.referenceNodeStore =
sessionSPI.getProtocolManager().getReferenceIDSupplier();
mirrorContext = protonSession.getSessionSPI().getSessionContext();
+ if (receiver.getRemoteDesiredCapabilities() != null) {
+ for (Symbol capability : receiver.getRemoteDesiredCapabilities()) {
+ if (capability == NO_FORWARD) {
+ this.canForwardMessages = false;
+ }
Review Comment:
As discussed yesterday, this should send back indicating it offers the
support, and the origin should check it receives the offer and barf if not.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -92,6 +95,8 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL
= new ThreadLocal<>();
+ private boolean canForwardMessages = true;
Review Comment:
I would flip this to e.g noForwardMessages, or change the option itself, to
match the same 'no forward' naming/sense used for the option. Simplifies things
considering it all in the same sense, if there isnt reason not to.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -779,4 +790,4 @@ public boolean isAlreadyAcked(Queue queue) {
-}
+}
Review Comment:
As before
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPChainedReplicaTest.java:
##########
@@ -196,4 +196,4 @@ public void testChained() throws Exception {
}
}
-}
+}
Review Comment:
Dont think we need to update this file for this change :)
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java:
##########
@@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends
AMQPBrokerConnectionEleme
boolean queueCreation = true;
+ boolean canForwardMessages = true;
Review Comment:
similarly I would keep the field the same sense as the option, plus the XML
and config option should also use the same sense so that the defaults are more
describable for both the XML and broker-properties usage. Currently someone
using broker-properties would need to configure 'canForwardMessages = false'
whilst someone using XML would need to configure 'no-messages-forwarding =
true' to flip their actually-shared behaviour.
##########
artemis-server/src/main/resources/schema/artemis-configuration.xsd:
##########
@@ -2290,6 +2290,14 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="no-message-forwarding" type="xsd:boolean"
use="optional" default="false">
+ <xsd:annotation>
+ <xsd:documentation>
+ If this is true, the mirror at the opposite end of the link
will not forward messages coming from that link to any other mirrors down the
line.
+ This is false by default.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
Review Comment:
This name+description, plus my earlier note about the acknowledgements, make
me realise it is still going to re-mirror other things like address and queue
creations, and more importantly deletions. Which could still be quite
problematic just like the message-duping themselves, likely worse due to timing
interactions of creation and deletion.
I'm not sure anything can be done about that except only using pre-created
everything. I am again sceptical of this topology.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact