gemmellr commented on code in PR #5220:
URL: https://github.com/apache/activemq-artemis/pull/5220#discussion_r1758631836


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java:
##########
@@ -430,24 +431,27 @@ private void doConnect() {
                   final Queue queue = 
server.locateQueue(getMirrorSNF(replica));
 
                   final boolean coreTunnelingEnabled = 
isCoreMessageTunnelingEnabled(replica);
-                  final Symbol[] desiredCapabilities;
 
+                  ArrayList<Symbol> capabilities = new ArrayList<>();
+                  
capabilities.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY);
                   if (coreTunnelingEnabled) {
-                     desiredCapabilities = new Symbol[] 
{AMQPMirrorControllerSource.MIRROR_CAPABILITY,
-                                                         
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT};
-                  } else {
-                     desiredCapabilities = new Symbol[] 
{AMQPMirrorControllerSource.MIRROR_CAPABILITY};
+                     
capabilities.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
+                  }
+                  if (!replica.getCanForwardMessages()) {
+                     capabilities.add(AMQPMirrorControllerSource.NO_FORWARD);
                   }
 
+                  final Symbol[] desiredCapabilities = (Symbol[]) 
capabilities.toArray(new Symbol[]{});
+
                   final Symbol[] requiredOfferedCapabilities = new Symbol[] 
{AMQPMirrorControllerSource.MIRROR_CAPABILITY};
 
                   connectSender(queue,
                                 queue.getName().toString(),
                                 mirrorControllerSource::setLink,
                                 (r) -> 
AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(),
 r, getMirrorSNF(replica)),
                                 server.getNodeID().toString(),
-                                desiredCapabilities,
-                                null,
+                                 desiredCapabilities,

Review Comment:
   Extraneous space added, seems like this line doesnt need to change.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java:
##########
@@ -430,24 +431,27 @@ private void doConnect() {
                   final Queue queue = 
server.locateQueue(getMirrorSNF(replica));
 
                   final boolean coreTunnelingEnabled = 
isCoreMessageTunnelingEnabled(replica);
-                  final Symbol[] desiredCapabilities;
 
+                  ArrayList<Symbol> capabilities = new ArrayList<>();

Review Comment:
   desiredCapabilitiesList might be more obvious for the relationship to the 
array later



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -301,6 +303,10 @@ private boolean invalidTarget(MirrorController controller) 
{
       return controller != null && sameNode(getRemoteMirrorId(), 
controller.getRemoteMirrorId());
    }
 
+   private boolean isBlockedByNoForward(Message message) {
+      return 
Boolean.valueOf(true).equals(message.getBrokerProperty(INTERNAL_NO_FORWARD));

Review Comment:
   Boolean.TRUE



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -265,6 +277,7 @@ protected void actualDelivery(Message message, Delivery 
delivery, DeliveryAnnota
       OperationContext oldContext = recoverContext();
       incrementSettle();
 
+

Review Comment:
   superfluous newline addition (some more later too)



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java:
##########
@@ -1204,4 +1208,4 @@ private void doCloseConnector() {
          });
       }
    }
-}
+}

Review Comment:
   Can leave newlines at end, many editors will so it will probably just end up 
going back in and flip flop.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -534,6 +549,9 @@ private boolean sendMessage(Message message, 
DeliveryAnnotations deliveryAnnotat
 
       message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID);
       message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, 
internalMirrorID);
+      if (!this.canForwardMessages) {
+         message.setBrokerProperty(INTERNAL_NO_FORWARD, true);

Review Comment:
   I wondered if the RoutingContext could be used for the 'dont [re-]mirror 
this' handling. It already has functionality for doing that (see 
isMirrorDisabled() and related).
   
   However it now also occurs to me, we possibly do need to mark the message 
like this simply because when that message is acknowledged later, we probably 
want to know then that it was 'no forward' now, so that we dont mirror 
acknowledgement for it either given we never mirrored the message itself to 
begin with.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -534,6 +549,9 @@ private boolean sendMessage(Message message, 
DeliveryAnnotations deliveryAnnotat
 
       message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID);
       message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, 
internalMirrorID);
+      if (!this.canForwardMessages) {

Review Comment:
   can drop the "this."



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -248,6 +253,13 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback 
sessionSPI,
       this.configuration = server.getConfiguration();
       this.referenceNodeStore = 
sessionSPI.getProtocolManager().getReferenceIDSupplier();
       mirrorContext = protonSession.getSessionSPI().getSessionContext();
+      if (receiver.getRemoteDesiredCapabilities() != null) {
+         for (Symbol capability : receiver.getRemoteDesiredCapabilities()) {
+            if (capability == NO_FORWARD) {
+               this.canForwardMessages = false;
+            }

Review Comment:
   As discussed yesterday, this should send back indicating it offers the 
support, and the origin should check it receives the offer and barf if not.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java:
##########
@@ -92,6 +95,8 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
 
    private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL 
= new ThreadLocal<>();
 
+   private boolean canForwardMessages = true;

Review Comment:
   I would flip this to e.g noForwardMessages, or change the option itself, to 
match the same 'no forward' naming/sense used for the option. Simplifies things 
considering it all in the same sense, if there isnt reason not to.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -779,4 +790,4 @@ public boolean isAlreadyAcked(Queue queue) {
 
 
 
-}
+}

Review Comment:
   As before



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPChainedReplicaTest.java:
##########
@@ -196,4 +196,4 @@ public void testChained() throws Exception {
       }
 
    }
-}
+}

Review Comment:
   Dont think we need to update this file for this change :)



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java:
##########
@@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends 
AMQPBrokerConnectionEleme
 
    boolean queueCreation = true;
 
+   boolean canForwardMessages = true;

Review Comment:
   similarly I would keep the field the same sense as the option, plus the XML 
and config option should also use the same sense so that the defaults are more 
describable for both the XML and broker-properties usage. Currently someone 
using broker-properties would need to configure 'canForwardMessages = false' 
whilst someone using XML would need to configure 'no-messages-forwarding = 
true' to flip their actually-shared behaviour.



##########
artemis-server/src/main/resources/schema/artemis-configuration.xsd:
##########
@@ -2290,6 +2290,14 @@
             </xsd:documentation>
          </xsd:annotation>
       </xsd:attribute>
+      <xsd:attribute name="no-message-forwarding" type="xsd:boolean" 
use="optional" default="false">
+         <xsd:annotation>
+            <xsd:documentation>
+               If this is true, the mirror at the opposite end of the link 
will not forward messages coming from that link to any other mirrors down the 
line.
+               This is false by default.
+            </xsd:documentation>
+         </xsd:annotation>
+      </xsd:attribute>

Review Comment:
   This name+description, plus my earlier note about the acknowledgements, make 
me realise it is still going to re-mirror other things like address and queue 
creations, and more importantly deletions. Which could still be quite 
problematic just like the message-duping themselves, likely worse due to timing 
interactions of creation and deletion.
   
   I'm not sure anything can be done about that except only using pre-created 
everything. I am again sceptical of this topology.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org
For additional commands, e-mail: gitbox-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to