[ 
https://issues.apache.org/jira/browse/ARTEMIS-4136?focusedWorklogId=841144&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-841144
 ]

ASF GitHub Bot logged work on ARTEMIS-4136:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Jan/23 14:36
            Start Date: 23/Jan/23 14:36
    Worklog Time Spent: 10m 
      Work Description: clebertsuconic commented on code in PR #4338:
URL: https://github.com/apache/activemq-artemis/pull/4338#discussion_r1084133462


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +397,178 @@ public void postAcknowledge(MessageReference ref, 
AckReason reason) throws Excep
 
       if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || 
ref.getQueue().isMirrorController()))) {
          if (logger.isDebugEnabled()) {
-            logger.debug("{} rejecting postAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
+            logger.debug("preAcknowledge::{} rejecting preAcknowledge 
queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
          }
          return;
       }
 
       if (ignoreAddress(ref.getQueue().getAddress())) {
          if (logger.isTraceEnabled()) {
-            logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
+            logger.trace("preAcknowledge::{} rejecting preAcknowledge 
queue={}, ref={}, queue address is excluded", server, ref.getQueue().getName(), 
ref);
          }
          return;
       }
 
-      logger.trace("{} postAcknowledge {}", server, ref);
+      logger.trace("preAcknowledge::{} preAcknowledge {}", server, ref);
 
       String nodeID = idSupplier.getServerID(ref); // notice the brokerID will 
be null for any message generated on this broker.
       long internalID = idSupplier.getID(ref);
-      if (logger.isTraceEnabled()) {
-         logger.trace("{} sending ack message from server {} with 
messageID={}", server, nodeID, internalID);
+      Message messageCommand = createMessage(ref.getQueue().getAddress(), 
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+      if (sync) {
+         OperationContext operationContext;
+         operationContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+         messageCommand.setUserContext(OperationContext.class, 
operationContext);
+         if (tx == null) {
+            // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+            operationContext.replicationLineUp();
+         }
+      }
+
+      if (tx != null) {
+         MirrorACKOperation operation = getAckOperation(tx);
+         // notice the operationContext.replicationLineUp is done on 
beforeCommit as part of the TX
+         operation.addMessage(messageCommand, ref);
+      } else {
+         server.getStorageManager().afterStoreOperations(new IOCallback() {
+            @Override
+            public void done() {
+               try {
+                  logger.debug("preAcknowledge::afterStoreOperation for 
messageReference {}", ref);
+                  route(server, messageCommand);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+         });
+      }
+   }
+
+   private MirrorACKOperation getAckOperation(Transaction tx) {
+      MirrorACKOperation ackOperation = (MirrorACKOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+      if (ackOperation == null) {
+         logger.trace("getAckOperation::setting operation on transaction {}", 
tx);
+         ackOperation = new MirrorACKOperation(server);
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, 
ackOperation);
+         tx.afterStore(ackOperation);
+      }
+
+      return ackOperation;
+   }
+
+   private MirrorSendOperation getSendOperation(Transaction tx) {
+      if (tx == null) {
+         return null;
+      }
+      MirrorSendOperation sendOperation = (MirrorSendOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+      if (sendOperation == null) {
+         logger.trace("getSendOperation::setting operation on transaction {}", 
tx);
+         sendOperation = new MirrorSendOperation();
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION, 
sendOperation);
+         tx.afterStore(sendOperation);
+      }
+
+      return sendOperation;
+   }
+
+   private static class MirrorACKOperation extends 
TransactionOperationAbstract {
+
+      final ActiveMQServer server;
+
+      // This pair contains the Message used to generate the command towards 
the target, the reference being acked
+      final List<Pair<Message, MessageReference>> acks = new ArrayList<>();

Review Comment:
   @gemmellr I will just squash the separate commit. It looks better actually.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 841144)
    Time Spent: 4h 40m  (was: 4.5h)

> Mirror sync replication
> -----------------------
>
>                 Key: ARTEMIS-4136
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4136
>             Project: ActiveMQ Artemis
>          Issue Type: New Feature
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.28.0
>
>          Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> I'm adding an option sync=true|false on mirror.
> It will be possible to configure a mirror as this:
>      <broker-connections>
>          <amqp-connection uri="tcp://test1:111" name="test1" 
> retry-interval="333" reconnect-attempts="33" user="testuser" 
> password="testpassword">
>             <mirror sync="true"/>
>        </amqp-connection
>    </broker-connection>
> if sync is set to true, any client blocking operation would wait a mirror 
> callback.
> With that option set, any blocking operation on the broker will wait a mirror 
> roundtrip:
> tx.commit(), session.send (non transactional). client.ack (when configured as 
> sync).
> Notice that in AMQP client dispositions are always asynchronous, hence it's 
> only possible to sync acks if using transactional for AMQP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to