[ 
https://issues.apache.org/jira/browse/ARTEMIS-4136?focusedWorklogId=840687&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-840687
 ]

ASF GitHub Bot logged work on ARTEMIS-4136:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Jan/23 16:10
            Start Date: 20/Jan/23 16:10
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on code in PR #4338:
URL: https://github.com/apache/activemq-artemis/pull/4338#discussion_r1082720217


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +397,178 @@ public void postAcknowledge(MessageReference ref, 
AckReason reason) throws Excep
 
       if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || 
ref.getQueue().isMirrorController()))) {
          if (logger.isDebugEnabled()) {
-            logger.debug("{} rejecting postAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
+            logger.debug("preAcknowledge::{} rejecting preAcknowledge 
queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
          }
          return;
       }
 
       if (ignoreAddress(ref.getQueue().getAddress())) {
          if (logger.isTraceEnabled()) {
-            logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
+            logger.trace("preAcknowledge::{} rejecting preAcknowledge 
queue={}, ref={}, queue address is excluded", server, ref.getQueue().getName(), 
ref);
          }
          return;
       }
 
-      logger.trace("{} postAcknowledge {}", server, ref);
+      logger.trace("preAcknowledge::{} preAcknowledge {}", server, ref);
 
       String nodeID = idSupplier.getServerID(ref); // notice the brokerID will 
be null for any message generated on this broker.
       long internalID = idSupplier.getID(ref);
-      if (logger.isTraceEnabled()) {
-         logger.trace("{} sending ack message from server {} with 
messageID={}", server, nodeID, internalID);
+      Message messageCommand = createMessage(ref.getQueue().getAddress(), 
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+      if (sync) {
+         OperationContext operationContext;
+         operationContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+         messageCommand.setUserContext(OperationContext.class, 
operationContext);
+         if (tx == null) {
+            // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+            operationContext.replicationLineUp();
+         }
+      }
+
+      if (tx != null) {
+         MirrorACKOperation operation = getAckOperation(tx);
+         // notice the operationContext.replicationLineUp is done on 
beforeCommit as part of the TX
+         operation.addMessage(messageCommand, ref);
+      } else {
+         server.getStorageManager().afterStoreOperations(new IOCallback() {
+            @Override
+            public void done() {
+               try {
+                  logger.debug("preAcknowledge::afterStoreOperation for 
messageReference {}", ref);
+                  route(server, messageCommand);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+         });
+      }
+   }
+
+   private MirrorACKOperation getAckOperation(Transaction tx) {
+      MirrorACKOperation ackOperation = (MirrorACKOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+      if (ackOperation == null) {
+         logger.trace("getAckOperation::setting operation on transaction {}", 
tx);
+         ackOperation = new MirrorACKOperation(server);
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, 
ackOperation);
+         tx.afterStore(ackOperation);
+      }
+
+      return ackOperation;
+   }
+
+   private MirrorSendOperation getSendOperation(Transaction tx) {
+      if (tx == null) {
+         return null;
+      }
+      MirrorSendOperation sendOperation = (MirrorSendOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+      if (sendOperation == null) {
+         logger.trace("getSendOperation::setting operation on transaction {}", 
tx);
+         sendOperation = new MirrorSendOperation();
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION, 
sendOperation);
+         tx.afterStore(sendOperation);
+      }
+
+      return sendOperation;
+   }
+
+   private static class MirrorACKOperation extends 
TransactionOperationAbstract {
+
+      final ActiveMQServer server;
+
+      // This pair contains the Message used to generate the command towards 
the target, the reference being acked
+      final List<Pair<Message, MessageReference>> acks = new ArrayList<>();

Review Comment:
   Is either the reference or the message indexable so a map work at that point 
rather than the intermediate objects to add both to the list?
   
   Is use of this list thread safe? Will adds and removes always be on the same 
thread?



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java:
##########
@@ -698,7 +700,7 @@ private void replicaTest(boolean largeMessage,
       Connection connection = factory.createConnection();
       Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
       MessageProducer producer = 
session.createProducer(session.createQueue(getQueueName()));
-      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+      producer.setDeliveryMode(DeliveryMode.PERSISTENT);

Review Comment:
   Could also just remove the line.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +404,170 @@ public void postAcknowledge(MessageReference ref, 
AckReason reason) throws Excep
 
       if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || 
ref.getQueue().isMirrorController()))) {
          if (logger.isDebugEnabled()) {
-            logger.debug("{} rejecting postAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
+            logger.debug("preAcknowledge::{} rejecting preAcknowledge 
queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
          }
          return;
       }
 
       if (ignoreAddress(ref.getQueue().getAddress())) {
          if (logger.isTraceEnabled()) {
-            logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
+            logger.trace("preAcknowledge::{} rejecting preAcknowledge 
queue={}, ref={}, queue address is excluded", server, ref.getQueue().getName(), 
ref);
          }
          return;
       }
 
-      logger.trace("{} postAcknowledge {}", server, ref);
+      logger.trace("preAcknowledge::{} preAcknowledge {}", server, ref);
 
       String nodeID = idSupplier.getServerID(ref); // notice the brokerID will 
be null for any message generated on this broker.
       long internalID = idSupplier.getID(ref);
-      if (logger.isTraceEnabled()) {
-         logger.trace("{} sending ack message from server {} with 
messageID={}", server, nodeID, internalID);
+      Message messageCommand = createMessage(ref.getQueue().getAddress(), 
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+      if (sync) {
+         OperationContext operationContext;
+         operationContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+         messageCommand.setUserContext(OperationContext.class, 
operationContext);
+         if (tx == null) {
+            // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+            operationContext.replicationLineUp();
+         }
+      }
+
+      if (tx != null) {
+         MirrorACKOperation operation = getAckOperation(tx);
+         // notice the operationContext.replicationLineUp is done on 
beforeCommit as part of the TX
+         operation.addMessage(messageCommand, ref);
+      } else {
+         server.getStorageManager().afterStoreOperations(new IOCallback() {
+            @Override
+            public void done() {
+               try {
+                  logger.debug("preAcknowledge::afterStoreOperation for 
messageReference {}", ref);
+                  route(server, messageCommand);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+         });
+      }
+   }
+
+   private MirrorACKOperation getAckOperation(Transaction tx) {
+      MirrorACKOperation ackOperation = (MirrorACKOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+      if (ackOperation == null) {
+         logger.trace("getAckOperation::setting operation on transaction {}", 
tx);
+         ackOperation = new MirrorACKOperation(server);
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, 
ackOperation);
+         tx.afterStore(ackOperation);
+      }
+
+      return ackOperation;
+   }
+
+   private MirrorSendOperation getSendOperation(Transaction tx) {
+      if (tx == null) {
+         return null;
+      }
+      MirrorSendOperation sendOperation = (MirrorSendOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+      if (sendOperation == null) {
+         logger.trace("getSendOperation::setting operation on transaction {}", 
tx);
+         sendOperation = new MirrorSendOperation();
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION, 
sendOperation);
+         tx.afterStore(sendOperation);
+      }
+
+      return sendOperation;
+   }
+
+   private static class MirrorACKOperation extends 
TransactionOperationAbstract {
+
+      final ActiveMQServer server;
+
+      // This pair contains the Message used to generate the command towards 
the target, the reference being acked
+      final List<Pair<Message, MessageReference>> acks = new ArrayList<>();
+
+      MirrorACKOperation(ActiveMQServer server) {
+         this.server = server;
+      }
+
+      /**
+       *
+       * @param message the message with the instruction to ack on the target 
node. Notice this is not the message owned by the reference.
+       * @param ref the reference being acked
+       */
+      public void addMessage(Message message, MessageReference ref) {
+         acks.add(new Pair<>(message, ref));
+      }
+
+      @Override
+      public void beforeCommit(Transaction tx) {
+         logger.debug("MirrorACKOperation::beforeCommit processing {}", acks);
+         for (Pair<Message, MessageReference> ack : acks) {

Review Comment:
   Maybe acks.forEach (here, and below), remove need for the [hidden] iterator?



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -216,60 +225,120 @@ private boolean sameNode(String remoteID, String 
sourceID) {
    }
 
    @Override
-   public void sendMessage(Message message, RoutingContext context, 
List<MessageReference> refs) {
+   public void sendMessage(Transaction tx, Message message, RoutingContext 
context) {
       SimpleString address = context.getAddress(message);
 
       if (invalidTarget(context.getMirrorSource())) {
-         logger.trace("server {} is discarding send to avoid infinite loop 
(reflection with the mirror)", server);
+         logger.trace("sendMessage::server {} is discarding send to avoid 
infinite loop (reflection with the mirror)", server);
          return;
       }
 
       if (context.isInternal()) {
-         logger.trace("server {} is discarding send to avoid sending to 
internal queue", server);
+         logger.trace("sendMessage::server {} is discarding send to avoid 
sending to internal queue", server);
          return;
       }
 
       if (ignoreAddress(address)) {
-         logger.trace("server {} is discarding send to address {}, address 
doesn't match filter", server, address);
+         logger.trace("sendMessage::server {} is discarding send to address 
{}, address doesn't match filter", server, address);
          return;
       }
 
-      logger.trace("{} send message {}", server, message);
+      logger.trace("sendMessage::{} send message {}", server, message);
 
       try {
          context.setReusable(false);
 
-         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
-         String nodeID = setProtocolData(idSupplier, ref);
+         String nodeID = idSupplier.getServerID(message);
+
          if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
-            logger.trace("Message {} already belonged to the node, {}, it 
won't circle send", message, getRemoteMirrorId());
+            logger.trace("sendMessage::Message {} already belonged to the 
node, {}, it won't circle send", message, getRemoteMirrorId());
             return;
          }
+
+         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
+         setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
          snfQueue.refUp(ref);
-         refs.add(ref);
+
+         if (tx != null) {
+            logger.debug("sendMessage::Mirroring Message {} with TX", message);
+            getSendOperation(tx).addRef(ref);
+         } // if non transactional the afterStoreOperations will use the ref 
directly and call processReferences
+
+         if (sync) {
+            OperationContext operContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+            if (tx == null) {
+               // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+               operContext.replicationLineUp();
+            }
+            if (logger.isDebugEnabled()) {
+               logger.debug("sendMessage::mirror syncUp context={}, ref={}", 
operContext, ref);
+            }
+            ref.setProtocolData(OperationContext.class, operContext);
+         }
 
          if (message.isDurable() && snfQueue.isDurable()) {
             PostOfficeImpl.storeDurableReference(server.getStorageManager(), 
message, context.getTransaction(), snfQueue, true);
          }
 
+         if (tx == null) {
+            server.getStorageManager().afterStoreOperations(new IOCallback() {
+               @Override
+               public void done() {
+                  PostOfficeImpl.processReference(ref, false);
+               }
+
+               @Override
+               public void onError(int errorCode, String errorMessage) {
+               }
+            });
+         }
       } catch (Throwable e) {
          logger.warn(e.getMessage(), e);
       }
    }
 
+   private void syncDone(MessageReference reference) {
+      OperationContext ctx = reference.getProtocolData(OperationContext.class);
+      if (ctx != null) {
+         ctx.replicationDone();
+         logger.debug("syncDone::replicationDone::ctx={},ref={}", ctx, 
reference);
+      }  else {
+         Message message = reference.getMessage();
+         if (message != null) {
+            ctx = (OperationContext) 
message.getUserContext(OperationContext.class);
+            if (ctx != null) {
+               ctx.replicationDone();
+               logger.debug("syncDone::replicationDone  message={}", message);

Review Comment:
   extra space



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -239,37 +249,107 @@ public void sendMessage(Message message, RoutingContext 
context, List<MessageRef
       try {
          context.setReusable(false);
 
-         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
-         String nodeID = setProtocolData(idSupplier, ref);
+         String nodeID = idSupplier.getServerID(message);
+
          if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
             logger.trace("Message {} already belonged to the node, {}, it 
won't circle send", message, getRemoteMirrorId());
             return;
          }
+
+         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue);
+         setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
          snfQueue.refUp(ref);
+         List<MessageReference> refs;
+
+         if (tx != null) {
+            if (logger.isDebugEnabled()) {
+               logger.debug("Mirroring Message " + message + " with TX");
+            }
+            MirrorSendOperation mirrorSendOperation = getSendOperation(tx);
+            refs = mirrorSendOperation.mirroredRefs;
+         } else {
+            refs = new LinkedList<>();
+         }
+
          refs.add(ref);
 
+         if (sync) {
+            OperationContext operContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+            if (tx == null) {
+               // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+               operContext.replicationLineUp();
+            }
+            if (logger.isDebugEnabled()) {

Review Comment:
   What Tim said :)



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -303,8 +379,18 @@ private static Properties getProperties(Message message) {
       }
    }
 
+   private void postACKInternalMessage(MessageReference reference) {
+      if (logger.isDebugEnabled()) {

Review Comment:
   Would say the same unless server.getIdentity() is somehow expensive



##########
artemis-server/src/main/resources/schema/artemis-configuration.xsd:
##########
@@ -2447,6 +2447,14 @@
             </xsd:documentation>
          </xsd:annotation>
       </xsd:attribute>
+      <xsd:attribute name="sync" type="xsd:boolean" use="optional" 
default="true">
+         <xsd:annotation>
+            <xsd:documentation>
+               If this is true, producers will be waiting a response from the 
mirror before the sync is finished.
+               This is false by default.

Review Comment:
   Seems conflicted:
   
   default="true"
   
   "This is false by default."





Issue Time Tracking
-------------------

    Worklog Id:     (was: 840687)
    Time Spent: 3h  (was: 2h 50m)

> Mirror sync replication
> -----------------------
>
>                 Key: ARTEMIS-4136
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4136
>             Project: ActiveMQ Artemis
>          Issue Type: New Feature
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.28.0
>
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> I'm adding an option sync=true|false on mirror.
> It will be possible to configure a mirror as this:
>      <broker-connections>
>          <amqp-connection uri="tcp://test1:111" name="test1" 
> retry-interval="333" reconnect-attempts="33" user="testuser" 
> password="testpassword">
>             <mirror sync="true"/>
>        </amqp-connection
>    </broker-connection>
> if sync is set to true, any client blocking operation would wait a mirror 
> callback.
> With that option set, any blocking operation on the broker will wait a mirror 
> roundtrip:
> tx.commit(), session.send (non transactional). client.ack (when configured as 
> sync).
> Notice that in AMQP client dispositions are always asynchronous, hence it's 
> only possible to sync acks if using transactional for AMQP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to