[
https://issues.apache.org/jira/browse/ARTEMIS-4136?focusedWorklogId=840687&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-840687
]
ASF GitHub Bot logged work on ARTEMIS-4136:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 20/Jan/23 16:10
Start Date: 20/Jan/23 16:10
Worklog Time Spent: 10m
Work Description: gemmellr commented on code in PR #4338:
URL: https://github.com/apache/activemq-artemis/pull/4338#discussion_r1082720217
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +397,178 @@ public void postAcknowledge(MessageReference ref,
AckReason reason) throws Excep
if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() ||
ref.getQueue().isMirrorController()))) {
if (logger.isDebugEnabled()) {
- logger.debug("{} rejecting postAcknowledge queue={}, ref={} to
avoid infinite loop with the mirror (reflection)", server,
ref.getQueue().getName(), ref);
+ logger.debug("preAcknowledge::{} rejecting preAcknowledge
queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server,
ref.getQueue().getName(), ref);
}
return;
}
if (ignoreAddress(ref.getQueue().getAddress())) {
if (logger.isTraceEnabled()) {
- logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue
address is excluded", server, ref.getQueue().getName(), ref);
+ logger.trace("preAcknowledge::{} rejecting preAcknowledge
queue={}, ref={}, queue address is excluded", server, ref.getQueue().getName(),
ref);
}
return;
}
- logger.trace("{} postAcknowledge {}", server, ref);
+ logger.trace("preAcknowledge::{} preAcknowledge {}", server, ref);
String nodeID = idSupplier.getServerID(ref); // notice the brokerID will
be null for any message generated on this broker.
long internalID = idSupplier.getID(ref);
- if (logger.isTraceEnabled()) {
- logger.trace("{} sending ack message from server {} with
messageID={}", server, nodeID, internalID);
+ Message messageCommand = createMessage(ref.getQueue().getAddress(),
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+ if (sync) {
+ OperationContext operationContext;
+ operationContext =
OperationContextImpl.getContext(server.getExecutorFactory());
+ messageCommand.setUserContext(OperationContext.class,
operationContext);
+ if (tx == null) {
+ // notice that if transactional, the context is lined up on
beforeCommit as part of the transaction operation
+ operationContext.replicationLineUp();
+ }
+ }
+
+ if (tx != null) {
+ MirrorACKOperation operation = getAckOperation(tx);
+ // notice the operationContext.replicationLineUp is done on
beforeCommit as part of the TX
+ operation.addMessage(messageCommand, ref);
+ } else {
+ server.getStorageManager().afterStoreOperations(new IOCallback() {
+ @Override
+ public void done() {
+ try {
+ logger.debug("preAcknowledge::afterStoreOperation for
messageReference {}", ref);
+ route(server, messageCommand);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+ });
+ }
+ }
+
+ private MirrorACKOperation getAckOperation(Transaction tx) {
+ MirrorACKOperation ackOperation = (MirrorACKOperation)
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+ if (ackOperation == null) {
+ logger.trace("getAckOperation::setting operation on transaction {}",
tx);
+ ackOperation = new MirrorACKOperation(server);
+ tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION,
ackOperation);
+ tx.afterStore(ackOperation);
+ }
+
+ return ackOperation;
+ }
+
+ private MirrorSendOperation getSendOperation(Transaction tx) {
+ if (tx == null) {
+ return null;
+ }
+ MirrorSendOperation sendOperation = (MirrorSendOperation)
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+ if (sendOperation == null) {
+ logger.trace("getSendOperation::setting operation on transaction {}",
tx);
+ sendOperation = new MirrorSendOperation();
+ tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION,
sendOperation);
+ tx.afterStore(sendOperation);
+ }
+
+ return sendOperation;
+ }
+
+ private static class MirrorACKOperation extends
TransactionOperationAbstract {
+
+ final ActiveMQServer server;
+
+ // This pair contains the Message used to generate the command towards
the target, the reference being acked
+ final List<Pair<Message, MessageReference>> acks = new ArrayList<>();
Review Comment:
Is either the reference or the message indexable so a map work at that point
rather than the intermediate objects to add both to the list?
Is use of this list thread safe? Will adds and removes always be on the same
thread?
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java:
##########
@@ -698,7 +700,7 @@ private void replicaTest(boolean largeMessage,
Connection connection = factory.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer producer =
session.createProducer(session.createQueue(getQueueName()));
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Review Comment:
Could also just remove the line.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +404,170 @@ public void postAcknowledge(MessageReference ref,
AckReason reason) throws Excep
if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() ||
ref.getQueue().isMirrorController()))) {
if (logger.isDebugEnabled()) {
- logger.debug("{} rejecting postAcknowledge queue={}, ref={} to
avoid infinite loop with the mirror (reflection)", server,
ref.getQueue().getName(), ref);
+ logger.debug("preAcknowledge::{} rejecting preAcknowledge
queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server,
ref.getQueue().getName(), ref);
}
return;
}
if (ignoreAddress(ref.getQueue().getAddress())) {
if (logger.isTraceEnabled()) {
- logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue
address is excluded", server, ref.getQueue().getName(), ref);
+ logger.trace("preAcknowledge::{} rejecting preAcknowledge
queue={}, ref={}, queue address is excluded", server, ref.getQueue().getName(),
ref);
}
return;
}
- logger.trace("{} postAcknowledge {}", server, ref);
+ logger.trace("preAcknowledge::{} preAcknowledge {}", server, ref);
String nodeID = idSupplier.getServerID(ref); // notice the brokerID will
be null for any message generated on this broker.
long internalID = idSupplier.getID(ref);
- if (logger.isTraceEnabled()) {
- logger.trace("{} sending ack message from server {} with
messageID={}", server, nodeID, internalID);
+ Message messageCommand = createMessage(ref.getQueue().getAddress(),
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+ if (sync) {
+ OperationContext operationContext;
+ operationContext =
OperationContextImpl.getContext(server.getExecutorFactory());
+ messageCommand.setUserContext(OperationContext.class,
operationContext);
+ if (tx == null) {
+ // notice that if transactional, the context is lined up on
beforeCommit as part of the transaction operation
+ operationContext.replicationLineUp();
+ }
+ }
+
+ if (tx != null) {
+ MirrorACKOperation operation = getAckOperation(tx);
+ // notice the operationContext.replicationLineUp is done on
beforeCommit as part of the TX
+ operation.addMessage(messageCommand, ref);
+ } else {
+ server.getStorageManager().afterStoreOperations(new IOCallback() {
+ @Override
+ public void done() {
+ try {
+ logger.debug("preAcknowledge::afterStoreOperation for
messageReference {}", ref);
+ route(server, messageCommand);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+ });
+ }
+ }
+
+ private MirrorACKOperation getAckOperation(Transaction tx) {
+ MirrorACKOperation ackOperation = (MirrorACKOperation)
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+ if (ackOperation == null) {
+ logger.trace("getAckOperation::setting operation on transaction {}",
tx);
+ ackOperation = new MirrorACKOperation(server);
+ tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION,
ackOperation);
+ tx.afterStore(ackOperation);
+ }
+
+ return ackOperation;
+ }
+
+ private MirrorSendOperation getSendOperation(Transaction tx) {
+ if (tx == null) {
+ return null;
+ }
+ MirrorSendOperation sendOperation = (MirrorSendOperation)
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+ if (sendOperation == null) {
+ logger.trace("getSendOperation::setting operation on transaction {}",
tx);
+ sendOperation = new MirrorSendOperation();
+ tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION,
sendOperation);
+ tx.afterStore(sendOperation);
+ }
+
+ return sendOperation;
+ }
+
+ private static class MirrorACKOperation extends
TransactionOperationAbstract {
+
+ final ActiveMQServer server;
+
+ // This pair contains the Message used to generate the command towards
the target, the reference being acked
+ final List<Pair<Message, MessageReference>> acks = new ArrayList<>();
+
+ MirrorACKOperation(ActiveMQServer server) {
+ this.server = server;
+ }
+
+ /**
+ *
+ * @param message the message with the instruction to ack on the target
node. Notice this is not the message owned by the reference.
+ * @param ref the reference being acked
+ */
+ public void addMessage(Message message, MessageReference ref) {
+ acks.add(new Pair<>(message, ref));
+ }
+
+ @Override
+ public void beforeCommit(Transaction tx) {
+ logger.debug("MirrorACKOperation::beforeCommit processing {}", acks);
+ for (Pair<Message, MessageReference> ack : acks) {
Review Comment:
Maybe acks.forEach (here, and below), remove need for the [hidden] iterator?
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -216,60 +225,120 @@ private boolean sameNode(String remoteID, String
sourceID) {
}
@Override
- public void sendMessage(Message message, RoutingContext context,
List<MessageReference> refs) {
+ public void sendMessage(Transaction tx, Message message, RoutingContext
context) {
SimpleString address = context.getAddress(message);
if (invalidTarget(context.getMirrorSource())) {
- logger.trace("server {} is discarding send to avoid infinite loop
(reflection with the mirror)", server);
+ logger.trace("sendMessage::server {} is discarding send to avoid
infinite loop (reflection with the mirror)", server);
return;
}
if (context.isInternal()) {
- logger.trace("server {} is discarding send to avoid sending to
internal queue", server);
+ logger.trace("sendMessage::server {} is discarding send to avoid
sending to internal queue", server);
return;
}
if (ignoreAddress(address)) {
- logger.trace("server {} is discarding send to address {}, address
doesn't match filter", server, address);
+ logger.trace("sendMessage::server {} is discarding send to address
{}, address doesn't match filter", server, address);
return;
}
- logger.trace("{} send message {}", server, message);
+ logger.trace("sendMessage::{} send message {}", server, message);
try {
context.setReusable(false);
- MessageReference ref =
MessageReference.Factory.createReference(message, snfQueue);
- String nodeID = setProtocolData(idSupplier, ref);
+ String nodeID = idSupplier.getServerID(message);
+
if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
- logger.trace("Message {} already belonged to the node, {}, it
won't circle send", message, getRemoteMirrorId());
+ logger.trace("sendMessage::Message {} already belonged to the
node, {}, it won't circle send", message, getRemoteMirrorId());
return;
}
+
+ MessageReference ref =
MessageReference.Factory.createReference(message, snfQueue);
+ setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
snfQueue.refUp(ref);
- refs.add(ref);
+
+ if (tx != null) {
+ logger.debug("sendMessage::Mirroring Message {} with TX", message);
+ getSendOperation(tx).addRef(ref);
+ } // if non transactional the afterStoreOperations will use the ref
directly and call processReferences
+
+ if (sync) {
+ OperationContext operContext =
OperationContextImpl.getContext(server.getExecutorFactory());
+ if (tx == null) {
+ // notice that if transactional, the context is lined up on
beforeCommit as part of the transaction operation
+ operContext.replicationLineUp();
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("sendMessage::mirror syncUp context={}, ref={}",
operContext, ref);
+ }
+ ref.setProtocolData(OperationContext.class, operContext);
+ }
if (message.isDurable() && snfQueue.isDurable()) {
PostOfficeImpl.storeDurableReference(server.getStorageManager(),
message, context.getTransaction(), snfQueue, true);
}
+ if (tx == null) {
+ server.getStorageManager().afterStoreOperations(new IOCallback() {
+ @Override
+ public void done() {
+ PostOfficeImpl.processReference(ref, false);
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+ });
+ }
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
+ private void syncDone(MessageReference reference) {
+ OperationContext ctx = reference.getProtocolData(OperationContext.class);
+ if (ctx != null) {
+ ctx.replicationDone();
+ logger.debug("syncDone::replicationDone::ctx={},ref={}", ctx,
reference);
+ } else {
+ Message message = reference.getMessage();
+ if (message != null) {
+ ctx = (OperationContext)
message.getUserContext(OperationContext.class);
+ if (ctx != null) {
+ ctx.replicationDone();
+ logger.debug("syncDone::replicationDone message={}", message);
Review Comment:
extra space
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -239,37 +249,107 @@ public void sendMessage(Message message, RoutingContext
context, List<MessageRef
try {
context.setReusable(false);
- MessageReference ref =
MessageReference.Factory.createReference(message, snfQueue);
- String nodeID = setProtocolData(idSupplier, ref);
+ String nodeID = idSupplier.getServerID(message);
+
if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
logger.trace("Message {} already belonged to the node, {}, it
won't circle send", message, getRemoteMirrorId());
return;
}
+
+ MessageReference ref =
MessageReference.Factory.createReference(message, snfQueue);
+ setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
snfQueue.refUp(ref);
+ List<MessageReference> refs;
+
+ if (tx != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Mirroring Message " + message + " with TX");
+ }
+ MirrorSendOperation mirrorSendOperation = getSendOperation(tx);
+ refs = mirrorSendOperation.mirroredRefs;
+ } else {
+ refs = new LinkedList<>();
+ }
+
refs.add(ref);
+ if (sync) {
+ OperationContext operContext =
OperationContextImpl.getContext(server.getExecutorFactory());
+ if (tx == null) {
+ // notice that if transactional, the context is lined up on
beforeCommit as part of the transaction operation
+ operContext.replicationLineUp();
+ }
+ if (logger.isDebugEnabled()) {
Review Comment:
What Tim said :)
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -303,8 +379,18 @@ private static Properties getProperties(Message message) {
}
}
+ private void postACKInternalMessage(MessageReference reference) {
+ if (logger.isDebugEnabled()) {
Review Comment:
Would say the same unless server.getIdentity() is somehow expensive
##########
artemis-server/src/main/resources/schema/artemis-configuration.xsd:
##########
@@ -2447,6 +2447,14 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="sync" type="xsd:boolean" use="optional"
default="true">
+ <xsd:annotation>
+ <xsd:documentation>
+ If this is true, producers will be waiting a response from the
mirror before the sync is finished.
+ This is false by default.
Review Comment:
Seems conflicted:
default="true"
"This is false by default."
Issue Time Tracking
-------------------
Worklog Id: (was: 840687)
Time Spent: 3h (was: 2h 50m)
> Mirror sync replication
> -----------------------
>
> Key: ARTEMIS-4136
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4136
> Project: ActiveMQ Artemis
> Issue Type: New Feature
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Fix For: 2.28.0
>
> Time Spent: 3h
> Remaining Estimate: 0h
>
> I'm adding an option sync=true|false on mirror.
> It will be possible to configure a mirror as this:
> <broker-connections>
> <amqp-connection uri="tcp://test1:111" name="test1"
> retry-interval="333" reconnect-attempts="33" user="testuser"
> password="testpassword">
> <mirror sync="true"/>
> </amqp-connection
> </broker-connection>
> if sync is set to true, any client blocking operation would wait a mirror
> callback.
> With that option set, any blocking operation on the broker will wait a mirror
> roundtrip:
> tx.commit(), session.send (non transactional). client.ack (when configured as
> sync).
> Notice that in AMQP client dispositions are always asynchronous, hence it's
> only possible to sync acks if using transactional for AMQP.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)