clebertsuconic commented on code in PR #4338:
URL: https://github.com/apache/activemq-artemis/pull/4338#discussion_r1084110543


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +397,178 @@ public void postAcknowledge(MessageReference ref, 
AckReason reason) throws Excep
 
       if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || 
ref.getQueue().isMirrorController()))) {
          if (logger.isDebugEnabled()) {
-            logger.debug("{} rejecting postAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
+            logger.debug("preAcknowledge::{} rejecting preAcknowledge 
queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
          }
          return;
       }
 
       if (ignoreAddress(ref.getQueue().getAddress())) {
          if (logger.isTraceEnabled()) {
-            logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
+            logger.trace("preAcknowledge::{} rejecting preAcknowledge 
queue={}, ref={}, queue address is excluded", server, ref.getQueue().getName(), 
ref);
          }
          return;
       }
 
-      logger.trace("{} postAcknowledge {}", server, ref);
+      logger.trace("preAcknowledge::{} preAcknowledge {}", server, ref);
 
       String nodeID = idSupplier.getServerID(ref); // notice the brokerID will 
be null for any message generated on this broker.
       long internalID = idSupplier.getID(ref);
-      if (logger.isTraceEnabled()) {
-         logger.trace("{} sending ack message from server {} with 
messageID={}", server, nodeID, internalID);
+      Message messageCommand = createMessage(ref.getQueue().getAddress(), 
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+      if (sync) {
+         OperationContext operationContext;
+         operationContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+         messageCommand.setUserContext(OperationContext.class, 
operationContext);
+         if (tx == null) {
+            // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+            operationContext.replicationLineUp();
+         }
+      }
+
+      if (tx != null) {
+         MirrorACKOperation operation = getAckOperation(tx);
+         // notice the operationContext.replicationLineUp is done on 
beforeCommit as part of the TX
+         operation.addMessage(messageCommand, ref);
+      } else {
+         server.getStorageManager().afterStoreOperations(new IOCallback() {
+            @Override
+            public void done() {
+               try {
+                  logger.debug("preAcknowledge::afterStoreOperation for 
messageReference {}", ref);
+                  route(server, messageCommand);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+         });
+      }
+   }
+
+   private MirrorACKOperation getAckOperation(Transaction tx) {
+      MirrorACKOperation ackOperation = (MirrorACKOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+      if (ackOperation == null) {
+         logger.trace("getAckOperation::setting operation on transaction {}", 
tx);
+         ackOperation = new MirrorACKOperation(server);
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, 
ackOperation);
+         tx.afterStore(ackOperation);
+      }
+
+      return ackOperation;
+   }
+
+   private MirrorSendOperation getSendOperation(Transaction tx) {
+      if (tx == null) {
+         return null;
+      }
+      MirrorSendOperation sendOperation = (MirrorSendOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+      if (sendOperation == null) {
+         logger.trace("getSendOperation::setting operation on transaction {}", 
tx);
+         sendOperation = new MirrorSendOperation();
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION, 
sendOperation);
+         tx.afterStore(sendOperation);
+      }
+
+      return sendOperation;
+   }
+
+   private static class MirrorACKOperation extends 
TransactionOperationAbstract {
+
+      final ActiveMQServer server;
+
+      // This pair contains the Message used to generate the command towards 
the target, the reference being acked
+      final List<Pair<Message, MessageReference>> acks = new ArrayList<>();

Review Comment:
   I had made a comment last week and I pressed start a review by accident 
(weird), and all my comments were lost.. 
   
   I actually mentioned I didn't think it would make much difference... and I 
just realized what you meant on the Map.
   
   I'm using a Map on a separate commit. If you could take a look and I will 
squash it after you look.
   
   I don't think it will make much difference on the runtime... although the 
code is cleaner this way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to