[ 
https://issues.apache.org/jira/browse/ARTEMIS-4136?focusedWorklogId=841128&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-841128
 ]

ASF GitHub Bot logged work on ARTEMIS-4136:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Jan/23 13:46
            Start Date: 23/Jan/23 13:46
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on code in PR #4338:
URL: https://github.com/apache/activemq-artemis/pull/4338#discussion_r1084076247


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +397,178 @@ public void postAcknowledge(MessageReference ref, 
AckReason reason) throws Excep
 
       if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || 
ref.getQueue().isMirrorController()))) {
          if (logger.isDebugEnabled()) {
-            logger.debug("{} rejecting postAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
+            logger.debug("preAcknowledge::{} rejecting preAcknowledge 
queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
          }
          return;
       }
 
       if (ignoreAddress(ref.getQueue().getAddress())) {
          if (logger.isTraceEnabled()) {
-            logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
+            logger.trace("preAcknowledge::{} rejecting preAcknowledge 
queue={}, ref={}, queue address is excluded", server, ref.getQueue().getName(), 
ref);
          }
          return;
       }
 
-      logger.trace("{} postAcknowledge {}", server, ref);
+      logger.trace("preAcknowledge::{} preAcknowledge {}", server, ref);
 
       String nodeID = idSupplier.getServerID(ref); // notice the brokerID will 
be null for any message generated on this broker.
       long internalID = idSupplier.getID(ref);
-      if (logger.isTraceEnabled()) {
-         logger.trace("{} sending ack message from server {} with 
messageID={}", server, nodeID, internalID);
+      Message messageCommand = createMessage(ref.getQueue().getAddress(), 
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+      if (sync) {
+         OperationContext operationContext;
+         operationContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+         messageCommand.setUserContext(OperationContext.class, 
operationContext);
+         if (tx == null) {
+            // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+            operationContext.replicationLineUp();
+         }
+      }
+
+      if (tx != null) {
+         MirrorACKOperation operation = getAckOperation(tx);
+         // notice the operationContext.replicationLineUp is done on 
beforeCommit as part of the TX
+         operation.addMessage(messageCommand, ref);
+      } else {
+         server.getStorageManager().afterStoreOperations(new IOCallback() {
+            @Override
+            public void done() {
+               try {
+                  logger.debug("preAcknowledge::afterStoreOperation for 
messageReference {}", ref);
+                  route(server, messageCommand);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+         });
+      }
+   }
+
+   private MirrorACKOperation getAckOperation(Transaction tx) {
+      MirrorACKOperation ackOperation = (MirrorACKOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+      if (ackOperation == null) {
+         logger.trace("getAckOperation::setting operation on transaction {}", 
tx);
+         ackOperation = new MirrorACKOperation(server);
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, 
ackOperation);
+         tx.afterStore(ackOperation);
+      }
+
+      return ackOperation;
+   }
+
+   private MirrorSendOperation getSendOperation(Transaction tx) {
+      if (tx == null) {
+         return null;
+      }
+      MirrorSendOperation sendOperation = (MirrorSendOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+      if (sendOperation == null) {
+         logger.trace("getSendOperation::setting operation on transaction {}", 
tx);
+         sendOperation = new MirrorSendOperation();
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION, 
sendOperation);
+         tx.afterStore(sendOperation);
+      }
+
+      return sendOperation;
+   }
+
+   private static class MirrorACKOperation extends 
TransactionOperationAbstract {
+
+      final ActiveMQServer server;
+
+      // This pair contains the Message used to generate the command towards 
the target, the reference being acked
+      final List<Pair<Message, MessageReference>> acks = new ArrayList<>();

Review Comment:
   So, no removals  as the entire thing is discarded along with the parent tx 
object, ok. (Oops, I hadnt got to looking at the rest when commenting that, 
didnt go back hehe),
   
   If the 'UserContext' from the message is already by something else, then 
there likely isnt much difference as you say, though it will need to do a 
lookup (I was thinking a map here, it could foreach). If it isnt already in 
use, then starting to use it would be worse than the original Pair's.
   
   So yeah...maybe just put back the annoying Pair. It only applies in the Sync 
case, right?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 841128)
    Time Spent: 3h 50m  (was: 3h 40m)

> Mirror sync replication
> -----------------------
>
>                 Key: ARTEMIS-4136
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4136
>             Project: ActiveMQ Artemis
>          Issue Type: New Feature
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.28.0
>
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> I'm adding an option sync=true|false on mirror.
> It will be possible to configure a mirror as this:
>      <broker-connections>
>          <amqp-connection uri="tcp://test1:111" name="test1" 
> retry-interval="333" reconnect-attempts="33" user="testuser" 
> password="testpassword">
>             <mirror sync="true"/>
>        </amqp-connection
>    </broker-connection>
> if sync is set to true, any client blocking operation would wait a mirror 
> callback.
> With that option set, any blocking operation on the broker will wait a mirror 
> roundtrip:
> tx.commit(), session.send (non transactional). client.ack (when configured as 
> sync).
> Notice that in AMQP client dispositions are always asynchronous, hence it's 
> only possible to sync acks if using transactional for AMQP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to