[
https://issues.apache.org/jira/browse/ARTEMIS-4136?focusedWorklogId=840679&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-840679
]
ASF GitHub Bot logged work on ARTEMIS-4136:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 20/Jan/23 15:33
Start Date: 20/Jan/23 15:33
Worklog Time Spent: 10m
Work Description: clebertsuconic commented on code in PR #4338:
URL: https://github.com/apache/activemq-artemis/pull/4338#discussion_r1082705389
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +395,181 @@ public void postAcknowledge(MessageReference ref,
AckReason reason) throws Excep
if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() ||
ref.getQueue().isMirrorController()))) {
if (logger.isDebugEnabled()) {
- logger.debug("{} rejecting postAcknowledge queue={}, ref={} to
avoid infinite loop with the mirror (reflection)", server,
ref.getQueue().getName(), ref);
+ logger.debug("preAcknowledge::{} rejecting preAcknowledge
queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server,
ref.getQueue().getName(), ref);
}
return;
}
if (ignoreAddress(ref.getQueue().getAddress())) {
if (logger.isTraceEnabled()) {
- logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue
address is excluded", server, ref.getQueue().getName(), ref);
+ logger.trace("preAcknowledge::{} rejecting preAcknowledge
queue={}, ref={}, queue address is excluded", server, ref.getQueue().getName(),
ref);
}
return;
}
- logger.trace("{} postAcknowledge {}", server, ref);
+ logger.trace("preAcknowledge::{} preAcknowledge {}", server, ref);
String nodeID = idSupplier.getServerID(ref); // notice the brokerID will
be null for any message generated on this broker.
long internalID = idSupplier.getID(ref);
- if (logger.isTraceEnabled()) {
- logger.trace("{} sending ack message from server {} with
messageID={}", server, nodeID, internalID);
+ Message messageCommand = createMessage(ref.getQueue().getAddress(),
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+ if (sync) {
+ OperationContext operationContext;
+ operationContext =
OperationContextImpl.getContext(server.getExecutorFactory());
+ messageCommand.setUserContext(OperationContext.class,
operationContext);
+ if (tx == null) {
+ // notice that if transactional, the context is lined up on
beforeCommit as part of the transaction operation
+ operationContext.replicationLineUp();
+ }
+ }
+
+ if (tx != null) {
+ MirrorACKOperation operation = getAckOperation(tx);
+ // notice the operationContext.replicationLineUp is done on
beforeCommit as part of the TX
+ operation.addMessage(messageCommand, ref);
+ } else {
+ server.getStorageManager().afterStoreOperations(new IOCallback() {
+ @Override
+ public void done() {
+ try {
+ logger.debug("preAcknowledge::afterStoreOperation for
messageReference {}", ref);
+ route(server, messageCommand);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+ });
+ }
+ }
+
+ private MirrorACKOperation getAckOperation(Transaction tx) {
+ MirrorACKOperation ackOperation = (MirrorACKOperation)
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+ if (ackOperation == null) {
+ logger.trace("getAckOperation::setting operation on transaction {}",
tx);
+ ackOperation = new MirrorACKOperation(server);
+ tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION,
ackOperation);
+ tx.afterStore(ackOperation);
+ }
+
+ return ackOperation;
+ }
+
+ private MirrorSendOperation getSendOperation(Transaction tx) {
+ if (tx == null) {
+ return null;
+ }
+ MirrorSendOperation sendOperation = (MirrorSendOperation)
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+ if (sendOperation == null) {
+ logger.trace("getSendOperation::setting operation on transaction {}",
tx);
+ sendOperation = new MirrorSendOperation();
+ tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION,
sendOperation);
+ tx.afterStore(sendOperation);
+ }
+
+ return sendOperation;
+ }
+
+ private static class MirrorACKOperation extends
TransactionOperationAbstract {
+
+ final ActiveMQServer server;
+
+ final LinkedList<MessageReference> refs = new LinkedList<>();
Review Comment:
actually... all the other TransactionOperations that are using a similar
pattern are using ArrayList.
I just will just keep the same here.
Issue Time Tracking
-------------------
Worklog Id: (was: 840679)
Time Spent: 2h 40m (was: 2.5h)
> Mirror sync replication
> -----------------------
>
> Key: ARTEMIS-4136
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4136
> Project: ActiveMQ Artemis
> Issue Type: New Feature
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Fix For: 2.28.0
>
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
> I'm adding an option sync=true|false on mirror.
> It will be possible to configure a mirror as this:
> <broker-connections>
> <amqp-connection uri="tcp://test1:111" name="test1"
> retry-interval="333" reconnect-attempts="33" user="testuser"
> password="testpassword">
> <mirror sync="true"/>
> </amqp-connection
> </broker-connection>
> if sync is set to true, any client blocking operation would wait a mirror
> callback.
> With that option set, any blocking operation on the broker will wait a mirror
> roundtrip:
> tx.commit(), session.send (non transactional). client.ack (when configured as
> sync).
> Notice that in AMQP client dispositions are always asynchronous, hence it's
> only possible to sync acks if using transactional for AMQP.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)