clebertsuconic commented on code in PR #4338:
URL: https://github.com/apache/activemq-artemis/pull/4338#discussion_r1082693359


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +395,181 @@ public void postAcknowledge(MessageReference ref, 
AckReason reason) throws Excep
 
       if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || 
ref.getQueue().isMirrorController()))) {
          if (logger.isDebugEnabled()) {
-            logger.debug("{} rejecting postAcknowledge queue={}, ref={} to 
avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
+            logger.debug("preAcknowledge::{} rejecting preAcknowledge 
queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server, 
ref.getQueue().getName(), ref);
          }
          return;
       }
 
       if (ignoreAddress(ref.getQueue().getAddress())) {
          if (logger.isTraceEnabled()) {
-            logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue 
address is excluded", server, ref.getQueue().getName(), ref);
+            logger.trace("preAcknowledge::{} rejecting preAcknowledge 
queue={}, ref={}, queue address is excluded", server, ref.getQueue().getName(), 
ref);
          }
          return;
       }
 
-      logger.trace("{} postAcknowledge {}", server, ref);
+      logger.trace("preAcknowledge::{} preAcknowledge {}", server, ref);
 
       String nodeID = idSupplier.getServerID(ref); // notice the brokerID will 
be null for any message generated on this broker.
       long internalID = idSupplier.getID(ref);
-      if (logger.isTraceEnabled()) {
-         logger.trace("{} sending ack message from server {} with 
messageID={}", server, nodeID, internalID);
+      Message messageCommand = createMessage(ref.getQueue().getAddress(), 
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+      if (sync) {
+         OperationContext operationContext;
+         operationContext = 
OperationContextImpl.getContext(server.getExecutorFactory());
+         messageCommand.setUserContext(OperationContext.class, 
operationContext);
+         if (tx == null) {
+            // notice that if transactional, the context is lined up on 
beforeCommit as part of the transaction operation
+            operationContext.replicationLineUp();
+         }
+      }
+
+      if (tx != null) {
+         MirrorACKOperation operation = getAckOperation(tx);
+         // notice the operationContext.replicationLineUp is done on 
beforeCommit as part of the TX
+         operation.addMessage(messageCommand, ref);
+      } else {
+         server.getStorageManager().afterStoreOperations(new IOCallback() {
+            @Override
+            public void done() {
+               try {
+                  logger.debug("preAcknowledge::afterStoreOperation for 
messageReference {}", ref);
+                  route(server, messageCommand);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+            }
+         });
+      }
+   }
+
+   private MirrorACKOperation getAckOperation(Transaction tx) {
+      MirrorACKOperation ackOperation = (MirrorACKOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+      if (ackOperation == null) {
+         logger.trace("getAckOperation::setting operation on transaction {}", 
tx);
+         ackOperation = new MirrorACKOperation(server);
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION, 
ackOperation);
+         tx.afterStore(ackOperation);
+      }
+
+      return ackOperation;
+   }
+
+   private MirrorSendOperation getSendOperation(Transaction tx) {
+      if (tx == null) {
+         return null;
+      }
+      MirrorSendOperation sendOperation = (MirrorSendOperation) 
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+      if (sendOperation == null) {
+         logger.trace("getSendOperation::setting operation on transaction {}", 
tx);
+         sendOperation = new MirrorSendOperation();
+         tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION, 
sendOperation);
+         tx.afterStore(sendOperation);
+      }
+
+      return sendOperation;
+   }
+
+   private static class MirrorACKOperation extends 
TransactionOperationAbstract {
+
+      final ActiveMQServer server;
+
+      final LinkedList<MessageReference> refs = new LinkedList<>();

Review Comment:
   the number of elements is variable. It could be from one element to a large 
transaction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to