tabish121 commented on code in PR #4338:
URL: https://github.com/apache/activemq-artemis/pull/4338#discussion_r1082629881
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +395,181 @@ public void postAcknowledge(MessageReference ref,
AckReason reason) throws Excep
if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() ||
ref.getQueue().isMirrorController()))) {
if (logger.isDebugEnabled()) {
- logger.debug("{} rejecting postAcknowledge queue={}, ref={} to
avoid infinite loop with the mirror (reflection)", server,
ref.getQueue().getName(), ref);
+ logger.debug("preAcknowledge::{} rejecting preAcknowledge
queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server,
ref.getQueue().getName(), ref);
}
return;
}
if (ignoreAddress(ref.getQueue().getAddress())) {
if (logger.isTraceEnabled()) {
- logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue
address is excluded", server, ref.getQueue().getName(), ref);
+ logger.trace("preAcknowledge::{} rejecting preAcknowledge
queue={}, ref={}, queue address is excluded", server, ref.getQueue().getName(),
ref);
}
return;
}
- logger.trace("{} postAcknowledge {}", server, ref);
+ logger.trace("preAcknowledge::{} preAcknowledge {}", server, ref);
String nodeID = idSupplier.getServerID(ref); // notice the brokerID will
be null for any message generated on this broker.
long internalID = idSupplier.getID(ref);
- if (logger.isTraceEnabled()) {
- logger.trace("{} sending ack message from server {} with
messageID={}", server, nodeID, internalID);
+ Message messageCommand = createMessage(ref.getQueue().getAddress(),
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+ if (sync) {
+ OperationContext operationContext;
+ operationContext =
OperationContextImpl.getContext(server.getExecutorFactory());
+ messageCommand.setUserContext(OperationContext.class,
operationContext);
+ if (tx == null) {
+ // notice that if transactional, the context is lined up on
beforeCommit as part of the transaction operation
+ operationContext.replicationLineUp();
+ }
+ }
+
+ if (tx != null) {
+ MirrorACKOperation operation = getAckOperation(tx);
+ // notice the operationContext.replicationLineUp is done on
beforeCommit as part of the TX
+ operation.addMessage(messageCommand, ref);
+ } else {
+ server.getStorageManager().afterStoreOperations(new IOCallback() {
+ @Override
+ public void done() {
+ try {
+ logger.debug("preAcknowledge::afterStoreOperation for
messageReference {}", ref);
+ route(server, messageCommand);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+ });
+ }
+ }
+
+ private MirrorACKOperation getAckOperation(Transaction tx) {
+ MirrorACKOperation ackOperation = (MirrorACKOperation)
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+ if (ackOperation == null) {
+ logger.trace("getAckOperation::setting operation on transaction {}",
tx);
+ ackOperation = new MirrorACKOperation(server);
+ tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION,
ackOperation);
+ tx.afterStore(ackOperation);
+ }
+
+ return ackOperation;
+ }
+
+ private MirrorSendOperation getSendOperation(Transaction tx) {
+ if (tx == null) {
+ return null;
+ }
+ MirrorSendOperation sendOperation = (MirrorSendOperation)
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+ if (sendOperation == null) {
+ logger.trace("getSendOperation::setting operation on transaction {}",
tx);
+ sendOperation = new MirrorSendOperation();
+ tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION,
sendOperation);
+ tx.afterStore(sendOperation);
+ }
+
+ return sendOperation;
+ }
+
+ private static class MirrorACKOperation extends
TransactionOperationAbstract {
+
+ final ActiveMQServer server;
+
+ final LinkedList<MessageReference> refs = new LinkedList<>();
Review Comment:
I cringe any time I see a LinkedList used as they are almost always the
wrong choice as opposed to the simpler ArrayList, what is driving the use of
these over the simpler option?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]