This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 0d3cd8d880 ARTEMIS-4136 Mirrored sync replica
0d3cd8d880 is described below
commit 0d3cd8d88031dbf78851969a56041e839f828bf7
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Jan 18 16:58:39 2023 -0500
ARTEMIS-4136 Mirrored sync replica
I am adding an option sync=true or false on mirror. if sync, any client
blocking operation will wait a roundtrip to the mirror
acting like a sync replica.
---
.../protocol/amqp/broker/AMQPLargeMessage.java | 5 +-
.../artemis/protocol/amqp/broker/AMQPMessage.java | 8 +-
.../mirror/AMQPMirrorControllerAggregation.java | 12 +-
.../connect/mirror/AMQPMirrorControllerSource.java | 299 +++++++++++--
.../connect/mirror/AMQPMirrorControllerTarget.java | 13 +-
.../amqp/connect/mirror/ReferenceNodeStore.java | 15 +-
.../amqp/proton/ProtonServerSenderContext.java | 7 +-
.../core/protocol/openwire/amq/AMQConsumer.java | 4 +-
.../AMQPMirrorBrokerConnectionElement.java | 10 +
.../deployers/impl/FileConfigurationParser.java | 3 +-
.../core/paging/cursor/PagedReferenceImpl.java | 16 +-
.../impl/journal/OperationContextImpl.java | 8 +-
.../artemis/core/postoffice/PostOffice.java | 4 +
.../core/postoffice/impl/PostOfficeImpl.java | 24 +-
.../artemis/core/server/MessageReference.java | 4 +-
.../server/impl/AbstractProtocolReference.java | 49 +++
.../server/impl/GroupFirstMessageReference.java | 8 +-
.../core/server/impl/MessageReferenceImpl.java | 13 +-
.../artemis/core/server/impl/QueueImpl.java | 4 +-
.../core/server/mirror/MirrorController.java | 7 +-
.../transaction/TransactionPropertyIndexes.java | 4 +
.../core/transaction/impl/TransactionImpl.java | 29 +-
.../resources/schema/artemis-configuration.xsd | 8 +
.../core/config/impl/ConfigurationImplTest.java | 9 +
.../resources/ConfigurationTest-full-config.xml | 2 +-
docs/user-manual/en/amqp-broker-connections.md | 8 +-
.../amqp/AmqpReferenceDeliveryAnnotationTest.java | 2 +-
.../integration/amqp/connect/AMQPReplicaTest.java | 5 +-
.../amqp/connect/AMQPSyncMirrorTest.java | 474 +++++++++++++++++++++
.../config/impl/ConfigurationValidationTest.java | 2 +
30 files changed, 934 insertions(+), 122 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index ec86311338..e9987ed9ff 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -44,6 +44,7 @@ import
org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
@@ -148,8 +149,8 @@ public class AMQPLargeMessage extends AMQPMessage
implements LargeServerMessage
* It was written to check the deliveryAnnotationsForSendBuffer and
eventually move it to the protocolData.
*/
public void checkReference(MessageReference reference) {
- if (reference.getProtocolData() == null &&
deliveryAnnotationsForSendBuffer != null) {
- reference.setProtocolData(deliveryAnnotationsForSendBuffer);
+ if (reference.getProtocolData(DeliveryAnnotations.class) == null &&
deliveryAnnotationsForSendBuffer != null) {
+ reference.setProtocolData(DeliveryAnnotations.class,
deliveryAnnotationsForSendBuffer);
}
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index df3bdeda73..84e319cd2c 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -754,12 +754,10 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
public ReadableBuffer getSendBuffer(int deliveryCount, MessageReference
reference) {
ensureDataIsValid();
- DeliveryAnnotations daToWrite;
+ DeliveryAnnotations daToWrite = reference != null ?
reference.getProtocolData(DeliveryAnnotations.class) : null;
- if (reference != null && reference.getProtocolData() instanceof
DeliveryAnnotations) {
- daToWrite = (DeliveryAnnotations) reference.getProtocolData();
- } else {
- // deliveryAnnotationsForSendBuffer was an old API form where a
deliver could set it before deliver
+ if (reference == null) {
+ // deliveryAnnotationsForSendBuffer is part of an older API,
deprecated but still present
daToWrite = deliveryAnnotationsForSendBuffer;
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java
index cc5fb7eafb..007f9a0d83 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerAggregation.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import org.apache.activemq.artemis.core.transaction.Transaction;
/** this will be used when there are multiple replicas in use. */
public class AMQPMirrorControllerAggregation implements MirrorController,
ActiveMQComponent {
@@ -72,6 +73,13 @@ public class AMQPMirrorControllerAggregation implements
MirrorController, Active
return partitions;
}
+ @Override
+ public void preAcknowledge(Transaction tx, MessageReference ref, AckReason
reason) throws Exception {
+ for (MirrorController partition : partitions) {
+ partition.preAcknowledge(tx, ref, reason);
+ }
+ }
+
@Override
public void addAddress(AddressInfo addressInfo) throws Exception {
for (MirrorController partition : partitions) {
@@ -102,9 +110,9 @@ public class AMQPMirrorControllerAggregation implements
MirrorController, Active
}
@Override
- public void sendMessage(Message message, RoutingContext context,
List<MessageReference> refs) {
+ public void sendMessage(Transaction tx, Message message, RoutingContext
context) {
for (MirrorController partition : partitions) {
- partition.sendMessage(message, context, refs);
+ partition.sendMessage(tx, message, context);
}
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 188742af2c..d9052c4623 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -24,6 +25,9 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -34,6 +38,9 @@ import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import
org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
@@ -86,6 +93,7 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
final boolean deleteQueues;
final MirrorAddressFilter addressFilter;
private final AMQPBrokerConnection brokerConnection;
+ private final boolean sync;
final AMQPMirrorBrokerConnectionElement replicaConfig;
@@ -116,6 +124,7 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
this.addressFilter = new
MirrorAddressFilter(replicaConfig.getAddressFilter());
this.acks = replicaConfig.isMessageAcknowledgements();
this.brokerConnection = brokerConnection;
+ this.sync = replicaConfig.isSync();
}
public Queue getSnfQueue() {
@@ -216,60 +225,120 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
}
@Override
- public void sendMessage(Message message, RoutingContext context,
List<MessageReference> refs) {
+ public void sendMessage(Transaction tx, Message message, RoutingContext
context) {
SimpleString address = context.getAddress(message);
if (invalidTarget(context.getMirrorSource())) {
- logger.trace("server {} is discarding send to avoid infinite loop
(reflection with the mirror)", server);
+ logger.trace("sendMessage::server {} is discarding send to avoid
infinite loop (reflection with the mirror)", server);
return;
}
if (context.isInternal()) {
- logger.trace("server {} is discarding send to avoid sending to
internal queue", server);
+ logger.trace("sendMessage::server {} is discarding send to avoid
sending to internal queue", server);
return;
}
if (ignoreAddress(address)) {
- logger.trace("server {} is discarding send to address {}, address
doesn't match filter", server, address);
+ logger.trace("sendMessage::server {} is discarding send to address
{}, address doesn't match filter", server, address);
return;
}
- logger.trace("{} send message {}", server, message);
+ logger.trace("sendMessage::{} send message {}", server, message);
try {
context.setReusable(false);
- MessageReference ref =
MessageReference.Factory.createReference(message, snfQueue);
- String nodeID = setProtocolData(idSupplier, ref);
+ String nodeID = idSupplier.getServerID(message);
+
if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
- logger.trace("Message {} already belonged to the node, {}, it
won't circle send", message, getRemoteMirrorId());
+ logger.trace("sendMessage::Message {} already belonged to the
node, {}, it won't circle send", message, getRemoteMirrorId());
return;
}
+
+ MessageReference ref =
MessageReference.Factory.createReference(message, snfQueue);
+ setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
snfQueue.refUp(ref);
- refs.add(ref);
+
+ if (tx != null) {
+ logger.debug("sendMessage::Mirroring Message {} with TX", message);
+ getSendOperation(tx).addRef(ref);
+ } // if non transactional the afterStoreOperations will use the ref
directly and call processReferences
+
+ if (sync) {
+ OperationContext operContext =
OperationContextImpl.getContext(server.getExecutorFactory());
+ if (tx == null) {
+ // notice that if transactional, the context is lined up on
beforeCommit as part of the transaction operation
+ operContext.replicationLineUp();
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("sendMessage::mirror syncUp context={}, ref={}",
operContext, ref);
+ }
+ ref.setProtocolData(OperationContext.class, operContext);
+ }
if (message.isDurable() && snfQueue.isDurable()) {
PostOfficeImpl.storeDurableReference(server.getStorageManager(),
message, context.getTransaction(), snfQueue, true);
}
+ if (tx == null) {
+ server.getStorageManager().afterStoreOperations(new IOCallback() {
+ @Override
+ public void done() {
+ PostOfficeImpl.processReference(ref, false);
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+ });
+ }
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
+ private void syncDone(MessageReference reference) {
+ OperationContext ctx = reference.getProtocolData(OperationContext.class);
+ if (ctx != null) {
+ ctx.replicationDone();
+ logger.debug("syncDone::replicationDone::ctx={},ref={}", ctx,
reference);
+ } else {
+ Message message = reference.getMessage();
+ if (message != null) {
+ ctx = (OperationContext)
message.getUserContext(OperationContext.class);
+ if (ctx != null) {
+ ctx.replicationDone();
+ logger.debug("syncDone::replicationDone message={}", message);
+ } else {
+ logger.trace("syncDone::No operationContext set on message {}",
message);
+ }
+ } else {
+ logger.debug("syncDone::no message set on reference {}",
reference);
+ }
+ }
+ }
+
public static void validateProtocolData(ReferenceNodeStore
referenceIDSupplier, MessageReference ref, SimpleString snfAddress) {
- if (ref.getProtocolData() == null &&
!ref.getMessage().getAddressSimpleString().equals(snfAddress)) {
+ if (ref.getProtocolData(DeliveryAnnotations.class) == null &&
!ref.getMessage().getAddressSimpleString().equals(snfAddress)) {
setProtocolData(referenceIDSupplier, ref);
}
}
/** This method will return the brokerID used by the message */
private static String setProtocolData(ReferenceNodeStore
referenceIDSupplier, MessageReference ref) {
+ String brokerID = referenceIDSupplier.getServerID(ref);
+ long id = referenceIDSupplier.getID(ref);
+
+ setProtocolData(ref, brokerID, id);
+
+ return brokerID;
+ }
+
+ private static void setProtocolData(MessageReference ref, String brokerID,
long id) {
Map<Symbol, Object> daMap = new HashMap<>();
DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap);
- String brokerID = referenceIDSupplier.getServerID(ref);
-
// getListID will return null when the message was generated on this
broker.
// on this case we do not send the brokerID, and the ControllerTarget
will get the information from the link.
// this is just to safe a few bytes and some processing on the wire.
@@ -278,8 +347,6 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
daMap.put(BROKER_ID, brokerID);
}
- long id = referenceIDSupplier.getID(ref);
-
daMap.put(INTERNAL_ID, id);
String address = ref.getMessage().getAddress();
if (address != null) { // this is the message that was set through
routing
@@ -290,9 +357,7 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress());
}
}
- ref.setProtocolData(deliveryAnnotations);
-
- return brokerID;
+ ref.setProtocolData(DeliveryAnnotations.class, deliveryAnnotations);
}
private static Properties getProperties(Message message) {
@@ -303,12 +368,30 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
}
}
+ private void postACKInternalMessage(MessageReference reference) {
+ logger.debug("postACKInternalMessage::server={}, ref={}", server,
reference);
+ if (sync) {
+ syncDone(reference);
+ }
+ }
+
@Override
public void postAcknowledge(MessageReference ref, AckReason reason) throws
Exception {
+ if (!acks || ref.getQueue().isMirrorController()) {
+ postACKInternalMessage(ref);
+ return;
+ }
+ }
+
+ @Override
+ public void preAcknowledge(final Transaction tx, final MessageReference
ref, final AckReason reason) throws Exception {
+ if (logger.isTraceEnabled()) {
+ logger.trace("postACKInternalMessage::tx={}, ref={}, reason={}", tx,
ref, reason);
+ }
MirrorController controllerInUse = getControllerInUse();
- if (!acks || ref.getQueue().isMirrorController()) { // we don't call
postACK on snfqueues, otherwise we would get infinite loop because of this
feedback/
+ if (!acks || ref.getQueue().isMirrorController()) { // we don't call
preAcknowledge on snfqueues, otherwise we would get infinite loop because of
this feedback/
return;
}
@@ -318,28 +401,192 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() ||
ref.getQueue().isMirrorController()))) {
if (logger.isDebugEnabled()) {
- logger.debug("{} rejecting postAcknowledge queue={}, ref={} to
avoid infinite loop with the mirror (reflection)", server,
ref.getQueue().getName(), ref);
+ logger.debug("preAcknowledge::{} rejecting preAcknowledge
queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server,
ref.getQueue().getName(), ref);
}
return;
}
if (ignoreAddress(ref.getQueue().getAddress())) {
if (logger.isTraceEnabled()) {
- logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue
address is excluded", server, ref.getQueue().getName(), ref);
+ logger.trace("preAcknowledge::{} rejecting preAcknowledge
queue={}, ref={}, queue address is excluded", server, ref.getQueue().getName(),
ref);
}
return;
}
- logger.trace("{} postAcknowledge {}", server, ref);
+ logger.trace("preAcknowledge::{} preAcknowledge {}", server, ref);
String nodeID = idSupplier.getServerID(ref); // notice the brokerID will
be null for any message generated on this broker.
long internalID = idSupplier.getID(ref);
- if (logger.isTraceEnabled()) {
- logger.trace("{} sending ack message from server {} with
messageID={}", server, nodeID, internalID);
+ Message messageCommand = createMessage(ref.getQueue().getAddress(),
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+ if (sync) {
+ OperationContext operationContext;
+ operationContext =
OperationContextImpl.getContext(server.getExecutorFactory());
+ messageCommand.setUserContext(OperationContext.class,
operationContext);
+ if (tx == null) {
+ // notice that if transactional, the context is lined up on
beforeCommit as part of the transaction operation
+ operationContext.replicationLineUp();
+ }
+ }
+
+ if (tx != null) {
+ MirrorACKOperation operation = getAckOperation(tx);
+ // notice the operationContext.replicationLineUp is done on
beforeCommit as part of the TX
+ operation.addMessage(messageCommand, ref);
+ } else {
+ server.getStorageManager().afterStoreOperations(new IOCallback() {
+ @Override
+ public void done() {
+ try {
+ logger.debug("preAcknowledge::afterStoreOperation for
messageReference {}", ref);
+ route(server, messageCommand);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+ });
+ }
+ }
+
+ private MirrorACKOperation getAckOperation(Transaction tx) {
+ MirrorACKOperation ackOperation = (MirrorACKOperation)
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+ if (ackOperation == null) {
+ logger.trace("getAckOperation::setting operation on transaction {}",
tx);
+ ackOperation = new MirrorACKOperation(server);
+ tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION,
ackOperation);
+ tx.afterStore(ackOperation);
+ }
+
+ return ackOperation;
+ }
+
+ private MirrorSendOperation getSendOperation(Transaction tx) {
+ if (tx == null) {
+ return null;
+ }
+ MirrorSendOperation sendOperation = (MirrorSendOperation)
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+ if (sendOperation == null) {
+ logger.trace("getSendOperation::setting operation on transaction {}",
tx);
+ sendOperation = new MirrorSendOperation();
+ tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION,
sendOperation);
+ tx.afterStore(sendOperation);
+ }
+
+ return sendOperation;
+ }
+
+ private static class MirrorACKOperation extends
TransactionOperationAbstract {
+
+ final ActiveMQServer server;
+
+ // This map contains the Message used to generate the command towards
the target, the reference being acked
+ final HashMap<Message, MessageReference> acks = new HashMap<>();
+
+ MirrorACKOperation(ActiveMQServer server) {
+ this.server = server;
+ }
+
+ /**
+ *
+ * @param message the message with the instruction to ack on the target
node. Notice this is not the message owned by the reference.
+ * @param ref the reference being acked
+ */
+ public void addMessage(Message message, MessageReference ref) {
+ acks.put(message, ref);
+ }
+
+ @Override
+ public void beforeCommit(Transaction tx) {
+ logger.debug("MirrorACKOperation::beforeCommit processing {}", acks);
+ acks.forEach(this::doBeforeCommit);
+ }
+
+ // callback to be used on forEach
+ private void doBeforeCommit(Message ack, MessageReference ref) {
+ OperationContext context = (OperationContext)
ack.getUserContext(OperationContext.class);
+ if (context != null) {
+ context.replicationLineUp();
+ }
+ }
+
+ @Override
+ public void afterCommit(Transaction tx) {
+ logger.debug("MirrorACKOperation::afterCommit processing {}", acks);
+ acks.forEach(this::doAfterCommit);
+ }
+
+ // callback to be used on forEach
+ private void doAfterCommit(Message ack, MessageReference ref) {
+ try {
+ route(server, ack);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ ref.getMessage().usageDown();
+ }
+
+ @Override
+ public void afterRollback(Transaction tx) {
+ acks.forEach(this::doAfterRollback);
+ }
+
+ // callback to be used on forEach
+ private void doAfterRollback(Message ack, MessageReference ref) {
+ OperationContext context = (OperationContext)
ack.getUserContext(OperationContext.class);
+ if (context != null) {
+ context.replicationDone();
+ }
+ }
+
+ }
+
+ private static final class MirrorSendOperation extends
TransactionOperationAbstract {
+ final List<MessageReference> refs = new ArrayList<>();
+
+ public void addRef(MessageReference ref) {
+ refs.add(ref);
+ }
+
+ @Override
+ public void beforeCommit(Transaction tx) {
+ refs.forEach(this::doBeforeCommit);
+ }
+
+ // callback to be used on forEach
+ private void doBeforeCommit(MessageReference ref) {
+ OperationContext context =
ref.getProtocolData(OperationContext.class);
+ if (context != null) {
+ context.replicationLineUp();
+ }
+ }
+
+ @Override
+ public void afterRollback(Transaction tx) {
+ logger.debug("MirrorSendOperation::afterRollback, refs:{}", refs);
+ refs.forEach(this::doBeforeRollback);
+ }
+
+ // forEach callback
+ private void doBeforeRollback(MessageReference ref) {
+ OperationContext localCTX =
ref.getProtocolData(OperationContext.class);
+ if (localCTX != null) {
+ localCTX.replicationDone();
+ }
+ }
+
+ @Override
+ public void afterCommit(Transaction tx) {
+ logger.debug("MirrorSendOperation::afterCommit refs:{}", refs);
+ refs.forEach(this::doAfterCommit);
+ }
+
+ // forEach callback
+ private void doAfterCommit(MessageReference ref) {
+ PostOfficeImpl.processReference(ref, false);
}
- Message message = createMessage(ref.getQueue().getAddress(),
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
- route(server, message);
- ref.getMessage().usageDown();
}
private Message createMessage(SimpleString address, SimpleString queue,
Object event, String brokerID, Object body) {
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index 74edd0a42e..1140f5c8b4 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
-import java.util.List;
import java.util.function.BooleanSupplier;
import java.util.function.ToIntFunction;
@@ -279,6 +278,11 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
return AddressInfo.fromJSON(body);
}
+ @Override
+ public void preAcknowledge(Transaction tx, MessageReference ref, AckReason
reason) throws Exception {
+ // NO-OP
+ }
+
@Override
public void addAddress(AddressInfo addressInfo) throws Exception {
logger.debug("{} adding address {}", server, addressInfo);
@@ -359,10 +363,11 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
}
private void performAck(String nodeID, long messageID, Queue targetQueue,
ACKMessageOperation ackMessageOperation, AckReason reason, final short retry) {
+ MessageReference reference = targetQueue.removeWithSuppliedID(nodeID,
messageID, referenceNodeStore);
+
if (logger.isTraceEnabled()) {
- logger.trace("performAck (nodeID={}, messageID={}), targetQueue={}",
nodeID, messageID, targetQueue.getName());
+ logger.trace("performAck (nodeID={}, messageID={}), targetQueue={}).
Ref={}", nodeID, messageID, targetQueue.getName(), reference);
}
- MessageReference reference = targetQueue.removeWithSuppliedID(nodeID,
messageID, referenceNodeStore);
if (reference == null) {
if (logger.isDebugEnabled()) {
@@ -490,7 +495,7 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
}
@Override
- public void sendMessage(Message message, RoutingContext context,
List<MessageReference> refs) {
+ public void sendMessage(Transaction tx, Message message, RoutingContext
context) {
// Do nothing
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
index b0a8605fa4..d82560cace 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
@@ -19,6 +19,7 @@ package
org.apache.activemq.artemis.protocol.amqp.connect.mirror;
import java.util.HashMap;
import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.utils.collections.NodeStore;
@@ -112,7 +113,12 @@ public class ReferenceNodeStore implements
NodeStore<MessageReference> {
}
public String getServerID(MessageReference element) {
- Object nodeID =
element.getMessage().getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY);
+ return getServerID(element.getMessage());
+ }
+
+
+ public String getServerID(Message message) {
+ Object nodeID =
message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY);
if (nodeID != null) {
return nodeID.toString();
} else {
@@ -124,7 +130,8 @@ public class ReferenceNodeStore implements
NodeStore<MessageReference> {
}
public long getID(MessageReference element) {
- Long id = (Long)
element.getMessage().getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
+ Message message = element.getMessage();
+ Long id = getID(message);
if (id == null) {
return element.getMessageID();
} else {
@@ -132,6 +139,10 @@ public class ReferenceNodeStore implements
NodeStore<MessageReference> {
}
}
+ private Long getID(Message message) {
+ return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
+ }
+
@Override
public synchronized void clear() {
lists.forEach((k, v) -> v.clear());
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 2d85abaf84..98d62c46cb 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -683,14 +683,9 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
frameBuffer.clear();
- DeliveryAnnotations deliveryAnnotationsToEncode;
message.checkReference(reference);
- if (reference.getProtocolData() != null &&
reference.getProtocolData() instanceof DeliveryAnnotations) {
- deliveryAnnotationsToEncode =
(DeliveryAnnotations)reference.getProtocolData();
- } else {
- deliveryAnnotationsToEncode = null;
- }
+ DeliveryAnnotations deliveryAnnotationsToEncode =
reference.getProtocolData(DeliveryAnnotations.class);
try {
replaceInitialHeader(deliveryAnnotationsToEncode, context, new
NettyWritable(frameBuffer));
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index d064d5036c..2db8d835d0 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -295,7 +295,7 @@ public class AMQConsumer {
//handleDeliver is performed by an executor (see JBPAPP-6030): any
AMQConsumer can share the session.wireFormat()
dispatch = OpenWireMessageConverter.createMessageDispatch(reference,
message, session.wireFormat(), this,
session.getCoreServer().getNodeManager().getUUID());
int size = dispatch.getMessage().getSize();
- reference.setProtocolData(dispatch.getMessage().getMessageId());
+ reference.setProtocolData(MessageId.class,
dispatch.getMessage().getMessageId());
session.deliverMessage(dispatch);
currentWindow.decrementAndGet();
return size;
@@ -337,7 +337,7 @@ public class AMQConsumer {
// if it's browse only, nothing to be acked
final boolean removeReferences = !serverConsumer.isBrowseOnly() &&
!serverConsumer.getQueue().isNonDestructive();
- final List<MessageReference> ackList =
serverConsumer.scanDeliveringReferences(removeReferences, reference ->
startID.equals(reference.getProtocolData()), reference ->
lastID.equals(reference.getProtocolData()));
+ final List<MessageReference> ackList =
serverConsumer.scanDeliveringReferences(removeReferences, reference ->
startID.equals(reference.getProtocolData(MessageId.class)), reference ->
lastID.equals(reference.getProtocolData(MessageId.class)));
if (!ackList.isEmpty() || !removeReferences ||
serverConsumer.getQueue().isTemporary()) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
index 227b669fc7..f71ee913a1 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
@@ -28,6 +28,8 @@ public class AMQPMirrorBrokerConnectionElement extends
AMQPBrokerConnectionEleme
boolean messageAcknowledgements = true;
+ boolean sync = false;
+
SimpleString mirrorSNF;
String addressFilter;
@@ -98,4 +100,12 @@ public class AMQPMirrorBrokerConnectionElement extends
AMQPBrokerConnectionEleme
return this;
}
+ public boolean isSync() {
+ return sync;
+ }
+
+ public AMQPMirrorBrokerConnectionElement setSync(boolean sync) {
+ this.sync = sync;
+ return this;
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 669d76d94b..3bdb0ca706 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -2121,10 +2121,11 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
boolean queueCreation =
getBooleanAttribute(e2,"queue-creation", true);
boolean durable = getBooleanAttribute(e2, "durable", true);
boolean queueRemoval = getBooleanAttribute(e2, "queue-removal",
true);
+ boolean sync = getBooleanAttribute(e2, "sync", false);
String addressFilter = getAttributeValue(e2, "address-filter");
AMQPMirrorBrokerConnectionElement amqpMirrorConnectionElement =
new AMQPMirrorBrokerConnectionElement();
-
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter);
+
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync);
connectionElement = amqpMirrorConnectionElement;
connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR);
} else {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 55c38c820a..0abc464360 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -26,14 +26,14 @@ import
org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.impl.AbstractProtocolReference;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
-public class PagedReferenceImpl extends
LinkedListImpl.Node<PagedReferenceImpl> implements PagedReference, Runnable {
+public class PagedReferenceImpl extends AbstractProtocolReference implements
PagedReference, Runnable {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -75,8 +75,6 @@ public class PagedReferenceImpl extends
LinkedListImpl.Node<PagedReferenceImpl>
private boolean alreadyAcked;
- private Object protocolData;
-
//0 is false, 1 is true, 2 not defined
private static final byte IS_NOT_LARGE_MESSAGE = 0;
private static final byte IS_LARGE_MESSAGE = 1;
@@ -97,16 +95,6 @@ public class PagedReferenceImpl extends
LinkedListImpl.Node<PagedReferenceImpl>
private static final byte UNDEFINED_IS_DURABLE = -1;
private byte durable = UNDEFINED_IS_DURABLE;
- @Override
- public Object getProtocolData() {
- return protocolData;
- }
-
- @Override
- public void setProtocolData(Object protocolData) {
- this.protocolData = protocolData;
- }
-
@Override
public Message getMessage() {
return getPagedMessage().getMessage();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
index 91681ff078..ceb402809c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
@@ -177,8 +177,12 @@ public class OperationContextImpl implements
OperationContext {
}
} else {
if (storeOnly) {
- assert !storeOnlyTasks.isEmpty() ?
storeOnlyTasks.peekLast().storeLined <= storeLined : true;
- storeOnlyTasks.add(new StoreOnlyTaskHolder(completion,
storeLined));
+ if (storeLined == stored &&
EXECUTORS_PENDING_UPDATER.get(this) == 0) {
+ executeNow = true;
+ } else {
+ assert !storeOnlyTasks.isEmpty() ?
storeOnlyTasks.peekLast().storeLined <= storeLined : true;
+ storeOnlyTasks.add(new StoreOnlyTaskHolder(completion,
storeLined));
+ }
} else {
// ensure total ordering
assert validateTasksAdd(storeLined, replicationLined,
pageLined);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index 8f82a172ea..4c368be6b9 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -224,4 +224,8 @@ public interface PostOffice extends ActiveMQComponent {
default AddressManager getAddressManager() {
return null;
}
+
+ default void preAcknowledge(final Transaction tx, final MessageReference
ref, AckReason reason) {
+ }
+
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index b9c60f1fb3..3950a4dcaa 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -246,6 +246,18 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
return this;
}
+ @Override
+ public void preAcknowledge(final Transaction tx, final MessageReference
ref, AckReason reason) {
+ if (mirrorControllerSource != null && reason != AckReason.REPLACED) { //
we don't send replacements on LVQ as they are replaced themselves on the target
+ try {
+ mirrorControllerSource.preAcknowledge(tx, ref, reason);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ }
+
+
@Override
public void postAcknowledge(MessageReference ref, AckReason reason) {
if (mirrorControllerSource != null && reason != AckReason.REPLACED) { //
we don't send replacements on LVQ as they are replaced themselves on the target
@@ -1624,7 +1636,7 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
if (mirrorControllerSource != null && !context.isMirrorDisabled()) {
// we check for isMirrorDisabled as to avoid recursive loop from there
- mirrorControllerSource.sendMessage(message, context, refs);
+ mirrorControllerSource.sendMessage(tx, message, context);
}
@@ -1647,10 +1659,12 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
}
}
- private static void processReferences(List<MessageReference> refs, boolean
direct) {
- for (MessageReference ref : refs) {
- ref.getQueue().addTail(ref, direct);
- }
+ public static void processReferences(List<MessageReference> refs, boolean
direct) {
+ refs.forEach((ref) -> processReference(ref, direct));
+ }
+
+ public static void processReference(MessageReference ref, boolean direct) {
+ ref.getQueue().addTail(ref, direct);
}
private void processRouteToDurableQueues(final Message message,
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index daf4f90db1..dfb0c17b50 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -74,13 +74,13 @@ public interface MessageReference {
* To be used on holding protocol specific data during the delivery.
* This will be only valid while the message is on the delivering queue at
the consumer
*/
- Object getProtocolData();
+ <T> T getProtocolData(Class<T> typeClass);
/**
* To be used on holding protocol specific data during the delivery.
* This will be only valid while the message is on the delivering queue at
the consumer
*/
- void setProtocolData(Object data);
+ <T> void setProtocolData(Class<T> typeClass, T data);
MessageReference copy(Queue queue);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AbstractProtocolReference.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AbstractProtocolReference.java
new file mode 100644
index 0000000000..35f3a07548
--- /dev/null
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AbstractProtocolReference.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.HashMap;
+
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
+
+/** I need to store protocol specific data on the references. The same need
exists in both PagedReference and MessageReferenceImpl.
+ * This class will serve the purpose to keep the specific protocol data for
either reference.
+ * */
+public abstract class AbstractProtocolReference extends
LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference {
+
+ private HashMap<Class, Object> protocolDataMap;
+
+ @Override
+ public <T> T getProtocolData(Class<T> classType) {
+ if (protocolDataMap == null) {
+ return null;
+ } else {
+ return (T)protocolDataMap.get(classType);
+ }
+ }
+
+ @Override
+ public <T> void setProtocolData(Class<T> classType, T protocolData) {
+ if (protocolDataMap == null) {
+ protocolDataMap = new HashMap<>();
+ }
+ protocolDataMap.put(classType, protocolData);
+ }
+
+}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
index c0b0e02747..10163bdacd 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
@@ -89,13 +89,13 @@ public class GroupFirstMessageReference implements
MessageReference {
}
@Override
- public Object getProtocolData() {
- return messageReference.getProtocolData();
+ public <T> T getProtocolData(Class<T> typeClass) {
+ return messageReference.getProtocolData(typeClass);
}
@Override
- public void setProtocolData(Object data) {
- messageReference.setProtocolData(data);
+ public <T> void setProtocolData(Class<T> typeClass, T data) {
+ messageReference.setProtocolData(typeClass, data);
}
@Override
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 262689aa53..6d39c40372 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -27,12 +27,11 @@ import
org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
/**
* Implementation of a MessageReference
*/
-public class MessageReferenceImpl extends
LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable
{
+public class MessageReferenceImpl extends AbstractProtocolReference implements
MessageReference, Runnable {
private static final MessageReferenceComparatorByID idComparator = new
MessageReferenceComparatorByID();
@@ -78,8 +77,6 @@ public class MessageReferenceImpl extends
LinkedListImpl.Node<MessageReferenceIm
private boolean deliveredDirectly;
- private Object protocolData;
-
private Consumer<? super MessageReference> onDelivery;
@@ -138,15 +135,7 @@ public class MessageReferenceImpl extends
LinkedListImpl.Node<MessageReferenceIm
}
}
- @Override
- public Object getProtocolData() {
- return protocolData;
- }
- @Override
- public void setProtocolData(Object protocolData) {
- this.protocolData = protocolData;
- }
/**
* @return the persistedCount
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index bbc90f56c9..e664cbc998 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1877,9 +1877,11 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
if (logger.isTraceEnabled()) {
- logger.trace("{} acknowledge tx={} ref={}, reason={}, consumer={}",
this, transactional, ref, reason, consumer);
+ logger.trace("queue.acknowledge serverIdentity={}, queue={}
acknowledge tx={} ref={}, reason={}, consumer={}", server.getIdentity(),
this.getName(), transactional, ref, reason, consumer);
}
+ postOffice.preAcknowledge(tx, ref, reason);
+
if (nonDestructive && reason == AckReason.NORMAL) {
if (transactional) {
refsOperation.addOnlyRefAck(ref);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java
index 5532d0bf9c..9ecf8d1b17 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorController.java
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.core.server.mirror;
-import java.util.List;
-
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -25,7 +23,7 @@ import
org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-
+import org.apache.activemq.artemis.core.transaction.Transaction;
/**
* This represents the contract we will use to send messages to replicas.
@@ -35,9 +33,10 @@ public interface MirrorController {
void deleteAddress(AddressInfo addressInfo) throws Exception;
void createQueue(QueueConfiguration queueConfiguration) throws Exception;
void deleteQueue(SimpleString addressName, SimpleString queueName) throws
Exception;
- void sendMessage(Message message, RoutingContext context,
List<MessageReference> refs);
+ void sendMessage(Transaction tx, Message message, RoutingContext context);
void postAcknowledge(MessageReference ref, AckReason reason) throws
Exception;
+ void preAcknowledge(Transaction tx, MessageReference ref, AckReason reason)
throws Exception;
String getRemoteMirrorId();
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java
index d6c332a6b5..813ef5a098 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java
@@ -37,4 +37,8 @@ public class TransactionPropertyIndexes {
public static final int EXPIRY_LOGGER = 9;
public static final int CONSUMER_METRICS_OPERATION = 10;
+
+ public static final int MIRROR_ACK_OPERATION = 11;
+
+ public static final int MIRROR_SEND_OPERATION = 12;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index 13ee0ba73e..536dbf1b42 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -18,11 +18,11 @@ package org.apache.activemq.artemis.core.transaction.impl;
import javax.transaction.xa.Xid;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
+import io.netty.util.collection.IntObjectHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@@ -50,7 +50,7 @@ public class TransactionImpl implements Transaction {
private static final int INITIAL_NUM_PROPERTIES = 11;
- private Object[] properties = null;
+ private IntObjectHashMap properties = null;
protected final StorageManager storageManager;
@@ -72,22 +72,6 @@ public class TransactionImpl implements Transaction {
private Object protocolData;
- private void ensurePropertiesCapacity(int capacity) {
- if (properties != null && properties.length >= capacity) {
- return;
- }
- createOrEnlargeProperties(capacity);
- }
-
- private void createOrEnlargeProperties(int capacity) {
- if (properties == null) {
- properties = new
Object[Math.min(TransactionImpl.INITIAL_NUM_PROPERTIES, capacity)];
- } else {
- assert properties.length < capacity;
- properties = Arrays.copyOf(properties, capacity);
- }
- }
-
@Override
public Object getProtocolData() {
return protocolData;
@@ -529,14 +513,17 @@ public class TransactionImpl implements Transaction {
@Override
public void putProperty(final int index, final Object property) {
- ensurePropertiesCapacity(index + 1);
- properties[index] = property;
+ if (properties == null) {
+ properties = new IntObjectHashMap();
+ }
+
+ properties.put(index, property);
}
@Override
public Object getProperty(final int index) {
- return properties == null ? null : (index < properties.length ?
properties[index] : null);
+ return properties == null ? null : properties.get(index);
}
// Private
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index cc8c2cd538..9154d510fd 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2447,6 +2447,14 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="sync" type="xsd:boolean" use="optional"
default="false">
+ <xsd:annotation>
+ <xsd:documentation>
+ If this is true, client blocking operations will be waiting a
response from the mirror before the unblocking the operation.
+ This is false by default.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
<xsd:attribute name="address-filter" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index aa5088452e..581cc78ea8 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -709,6 +709,11 @@ public class ConfigurationImplTest extends
ActiveMQTestBase {
@Test
public void testAMQPConnectionsConfiguration() throws Throwable {
+ testAMQPConnectionsConfiguration(true);
+ testAMQPConnectionsConfiguration(false);
+ }
+
+ private void testAMQPConnectionsConfiguration(boolean sync) throws
Throwable {
ConfigurationImpl configuration = new ConfigurationImpl();
Properties insertionOrderedProperties = new
ConfigurationImpl.InsertionOrderedProperties();
@@ -723,6 +728,9 @@ public class ConfigurationImplTest extends ActiveMQTestBase
{
insertionOrderedProperties.put("AMQPConnections.target.connectionElements.mirror.queueCreation",
"true");
insertionOrderedProperties.put("AMQPConnections.target.connectionElements.mirror.queueRemoval",
"true");
insertionOrderedProperties.put("AMQPConnections.target.connectionElements.mirror.addressFilter",
"foo");
+ if (sync) {
+
insertionOrderedProperties.put("AMQPConnections.target.connectionElements.mirror.sync",
"true");
+ } // else we just use the default that is false
configuration.parsePrefixedProperties(insertionOrderedProperties, null);
@@ -742,6 +750,7 @@ public class ConfigurationImplTest extends ActiveMQTestBase
{
Assert.assertEquals(true,
amqpMirrorBrokerConnectionElement.isMessageAcknowledgements());
Assert.assertEquals(true,
amqpMirrorBrokerConnectionElement.isQueueCreation());
Assert.assertEquals(true,
amqpMirrorBrokerConnectionElement.isQueueRemoval());
+ Assert.assertEquals(sync, ((AMQPMirrorBrokerConnectionElement)
amqpBrokerConnectionElement).isSync());
Assert.assertEquals("foo",
amqpMirrorBrokerConnectionElement.getAddressFilter());
}
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 8fec6467f4..8adcc93cf2 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -442,7 +442,7 @@
<receiver address-match="TEST-RECEIVER" />
<peer address-match="TEST-PEER"/>
<receiver queue-name="TEST-WITH-QUEUE-NAME"/>
- <mirror message-acknowledgements="false" queue-creation="false"
durable="false" queue-removal="false"
address-filter="TEST-QUEUE,!IGNORE-QUEUE"/>
+ <mirror message-acknowledgements="false" queue-creation="false"
durable="false" queue-removal="false" address-filter="TEST-QUEUE,!IGNORE-QUEUE"
sync="true"/>
</amqp-connection>
<amqp-connection uri="tcp://test2:222" name="test2">
<mirror durable="false"/>
diff --git a/docs/user-manual/en/amqp-broker-connections.md
b/docs/user-manual/en/amqp-broker-connections.md
index 667ab8ceff..68784e7eee 100644
--- a/docs/user-manual/en/amqp-broker-connections.md
+++ b/docs/user-manual/en/amqp-broker-connections.md
@@ -81,8 +81,7 @@ The previous example portrays a case of connection failure
towards ServerA. The
<div style="page-break-after: always"></div>
## Mirroring
-The idea of mirroring is to send events that happen on a broker towards
another broker, without blocking any operations from producers and consumers,
allowing them to keep operating as fast as possible.
-It can be used for Disaster Recovery, and depending on the requirements even
for failing over the data.
+Mirroring will reproduce any operation that happened on the source brokers
towards a target broker.
The following events are sent through mirroring:
@@ -94,6 +93,8 @@ The following events are sent through mirroring:
* Queue and address creation.
* Queue and address deletion.
+By default every operation is sent asynchronously without blocking any
clients. However if you set sync="true" on the mirror configuration, the
clients will always wait a mirror on every blocking operation.
+
### Mirror configuration
Add a `<mirror>` element within the `<amqp-connection>` element to configure
mirroring to the target broker.
@@ -119,9 +120,10 @@ The following optional arguments can be utilized:
matches all addresses starting with 'eu' but not those starting with
'eu.uk'
**Note:**
-
- Address exclusion will always take precedence over address inclusion.
- Address matching on mirror elements is prefix-based and does not support
wild-card matching.
+* `sync`: By default is false. If set it to true any client blocking operation
will be held until the mirror as confirmed receiving the operation.
+ * Notice that a disconnected node would hold all operations from the client.
If you set sync=true you must reconnect a mirror before performing any
operations.
An example of a mirror configuration is shown below:
```xml
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReferenceDeliveryAnnotationTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReferenceDeliveryAnnotationTest.java
index 24ddf08d71..7c88310fbf 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReferenceDeliveryAnnotationTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReferenceDeliveryAnnotationTest.java
@@ -77,7 +77,7 @@ public class AmqpReferenceDeliveryAnnotationTest extends
AmqpClientTestSupport {
Map<Symbol, Object> symbolObjectMap = new HashMap<>();
DeliveryAnnotations deliveryAnnotations = new
DeliveryAnnotations(symbolObjectMap);
symbolObjectMap.put(Symbol.getSymbol("KEY"), uuid);
- reference.setProtocolData(deliveryAnnotations);
+ reference.setProtocolData(DeliveryAnnotations.class,
deliveryAnnotations);
}
});
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
index c717bdaea5..841c8ec1a6 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
@@ -681,9 +681,11 @@ public class AMQPReplicaTest extends AmqpClientTestSupport
{
server_2.getConfiguration().setName("thisone");
AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration(brokerConnectionName, "tcp://localhost:" +
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
- AMQPMirrorBrokerConnectionElement replica = new
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(acks);
+ AMQPMirrorBrokerConnectionElement replica = new
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(acks).setDurable(true);
+ replica.setName("theReplica");
amqpConnection.addElement(replica);
server_2.getConfiguration().addAMQPConnection(amqpConnection);
+ server_2.getConfiguration().setName("server_2");
int NUMBER_OF_MESSAGES = 200;
@@ -698,7 +700,6 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
Connection connection = factory.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer producer =
session.createProducer(session.createQueue(getQueueName()));
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
if (!deferredStart) {
Queue queueOnServer1 = locateQueue(server, getQueueName());
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java
new file mode 100644
index 0000000000..5b399cfe33
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
+
+ Logger logger = LoggerFactory.getLogger(AMQPSyncMirrorTest.class);
+
+ private static final String SLOW_SERVER_NAME = "slow";
+ private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
+
+ private ActiveMQServer slowServer;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ protected String getConfiguredProtocols() {
+ return "AMQP,OPENWIRE,CORE";
+ }
+
+ @Test
+ public void testPersistedSendAMQP() throws Exception {
+ testPersistedSend("AMQP", false, 100);
+ }
+
+ @Test
+ public void testPersistedSendAMQPLarge() throws Exception {
+ testPersistedSend("AMQP", false, 200 * 1024);
+ }
+
+
+ @Test
+ public void testPersistedSendCore() throws Exception {
+ testPersistedSend("CORE", false, 100);
+ }
+
+ @Test
+ public void testPersistedSendCoreLarge() throws Exception {
+ testPersistedSend("CORE", false, 200 * 1024);
+ }
+
+ @Test
+ public void testPersistedSendAMQPTXLarge() throws Exception {
+ testPersistedSend("AMQP", true, 200 * 1024);
+ }
+
+ @Test
+ public void testPersistedSendAMQPTX() throws Exception {
+ testPersistedSend("AMQP", true, 100);
+ }
+
+ @Test
+ public void testPersistedSendCoreTX() throws Exception {
+ testPersistedSend("CORE", true, 100);
+ }
+
+ @Test
+ public void testPersistedSendCoreTXLarge() throws Exception {
+ testPersistedSend("CORE", true, 200 * 1024);
+ }
+
+ private void testPersistedSend(String protocol, boolean transactional, int
messageSize) throws Exception {
+ ReusableLatch sendPending = new ReusableLatch(0);
+ Semaphore semSend = new Semaphore(1);
+ Semaphore semAck = new Semaphore(1);
+ AtomicInteger errors = new AtomicInteger(0);
+
+ try {
+ final int NUMBER_OF_MESSAGES = 10;
+
+ AtomicInteger countStored = new AtomicInteger(0);
+
+ slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT,
SLOW_SERVER_NAME, (isUpdate, isTX, txId, id, recordType, persister, record) -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("StorageCallback::slow isUpdate={}, isTX={},
txID={}, id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType,
record);
+ }
+ if (transactional) {
+ if (isTX) {
+ try {
+ if (countStored.get() > 0) {
+ countStored.incrementAndGet();
+ logger.debug("semSend.tryAcquire");
+ if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+ logger.debug("acquired TX, now release");
+ semSend.release();
+ }
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ }
+ if (recordType == JournalRecordIds.ACKNOWLEDGE_REF) {
+ logger.debug("slow ACK REF");
+ try {
+ if (semAck.tryAcquire(20, TimeUnit.SECONDS)) {
+ semAck.release();
+ logger.debug("slow acquired ACK semaphore");
+ } else {
+ logger.debug("Semaphore wasn't acquired");
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ if (recordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
+ try {
+ countStored.incrementAndGet();
+ if (!transactional) {
+ logger.debug("semSend.tryAcquire");
+ if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+ logger.debug("acquired non TX now release");
+ semSend.release();
+ }
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ }
+ }
+ });
+ slowServer.setIdentity("slowServer");
+ server.setIdentity("server");
+
+ ExecutorService pool = Executors.newFixedThreadPool(5);
+ runAfter(pool::shutdown);
+
+ configureMirrorTowardsSlow(server);
+
+ slowServer.getConfiguration().setName("slow");
+ server.getConfiguration().setName("fast");
+ slowServer.start();
+ server.start();
+
+ waitForServerToStart(slowServer);
+ waitForServerToStart(server);
+
+ server.addAddressInfo(new
AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+ server.createQueue(new
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+ Wait.waitFor(() -> slowServer.locateQueue(getQueueName()) != null);
+ Queue replicatedQueue = slowServer.locateQueue(getQueueName());
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:" + AMQP_PORT);
+
+ if (factory instanceof ActiveMQConnectionFactory) {
+ ((ActiveMQConnectionFactory)
factory).getServerLocator().setBlockOnAcknowledge(true);
+ }
+
+ Connection connection = factory.createConnection();
+ runAfter(connection::close);
+ Session session = connection.createSession(transactional,
transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(getQueueName()));
+
+ connection.start();
+
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ final String bodyMessage;
+ {
+ StringBuffer buffer = new StringBuffer();
+ for (int i = 0; i < messageSize; i++) {
+ buffer.append("large Buffer...");
+ }
+ bodyMessage = buffer.toString();
+ }
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ logger.debug("===>>> send message {}", i);
+ int theI = i;
+ sendPending.countUp();
+ logger.debug("semSend.acquire");
+ semSend.acquire();
+ if (!transactional) {
+ pool.execute(() -> {
+ try {
+ logger.debug("Entering non TX send with sendPending =
{}", sendPending.getCount());
+ TextMessage message =
session.createTextMessage(bodyMessage);
+ message.setStringProperty("strProperty", "" + theI);
+ producer.send(message);
+ sendPending.countDown();
+ logger.debug("leaving non TX send with sendPending = {}",
sendPending.getCount());
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ }
+ });
+ } else {
+ CountDownLatch sendDone = new CountDownLatch(1);
+ pool.execute(() -> {
+ try {
+ TextMessage message =
session.createTextMessage(bodyMessage);
+ message.setStringProperty("strProperty", "" + theI);
+ producer.send(message);
+ } catch (Throwable e) {
+ errors.incrementAndGet();
+ logger.warn(e.getMessage(), e);
+ }
+ sendDone.countDown();
+ });
+
+ Wait.assertEquals(i, replicatedQueue::getMessageCount);
+
+ Assert.assertTrue(sendDone.await(10, TimeUnit.SECONDS));
+
+ pool.execute(() -> {
+ try {
+ session.commit();
+ sendPending.countDown();
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+ });
+ }
+
+ Assert.assertFalse("sendPending.await() not supposed to succeed",
sendPending.await(10, TimeUnit.MILLISECONDS));
+ logger.debug("semSend.release");
+ semSend.release();
+ Assert.assertTrue(sendPending.await(10, TimeUnit.SECONDS));
+ Wait.assertEquals(i + 1, replicatedQueue::getMessageCount);
+ }
+
+ if (!transactional) {
+ Wait.assertEquals(NUMBER_OF_MESSAGES, countStored::get);
+ }
+ Wait.assertEquals(NUMBER_OF_MESSAGES,
replicatedQueue::getMessageCount);
+
+ connection.start();
+
+ Session clientSession = transactional ?
connection.createSession(true, Session.AUTO_ACKNOWLEDGE) :
connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer =
clientSession.createConsumer(clientSession.createQueue(getQueueName()));
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ logger.debug("===<<< Receiving message {}", i);
+ Message message = consumer.receive(5000);
+ Assert.assertNotNull(message);
+ semAck.acquire();
+
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ pool.execute(() -> {
+ try {
+ if (transactional) {
+ clientSession.commit();
+ } else {
+ message.acknowledge();
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ countDownLatch.countDown();
+ }
+ });
+
+ if (!transactional && protocol.equals("AMQP")) {
+ // non transactional ack in AMQP is always async. No need to
verify anything else here
+ logger.debug("non transactional and amqp is always
asynchronous. No need to verify anything");
+ } else {
+ Assert.assertFalse(countDownLatch.await(10,
TimeUnit.MILLISECONDS));
+ }
+
+ semAck.release();
+ Assert.assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
+ Wait.assertEquals(NUMBER_OF_MESSAGES - i - 1,
replicatedQueue::getMessageCount);
+ }
+
+ Assert.assertEquals(0, errors.get());
+ } finally {
+ semAck.release();
+ semSend.release();
+ }
+ }
+
+ @Override
+ protected ActiveMQServer createServer() throws Exception {
+ ActiveMQServer server = createServerWithCallbackStorage(AMQP_PORT,
"fastServer", (isUpdate, isTX, txId, id, recordType, persister, record) -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("StorageCallback::fast isUpdate={}, isTX={}, txID={},
id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType, record);
+ }
+ });
+ addServer(server);
+ return server;
+ }
+
+ private void configureMirrorTowardsSlow(ActiveMQServer source) {
+ AMQPBrokerConnectConfiguration connection = new
AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:" +
SLOW_SERVER_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+ AMQPMirrorBrokerConnectionElement replication = new
AMQPMirrorBrokerConnectionElement().setDurable(true).setSync(true).setMessageAcknowledgements(true);
+ connection.addElement(replication);
+
+ source.getConfiguration().addAMQPConnection(connection);
+ }
+
+ private ActiveMQServer createServerWithCallbackStorage(int port, String
name, StorageCallback storageCallback) throws Exception {
+ ActiveMQSecurityManager securityManager = new
ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new
SecurityConfiguration());
+ ActiveMQServer server = new ActiveMQServerImpl(createBasicConfig(port),
mBeanServer, securityManager) {
+ @Override
+ protected StorageManager createStorageManager() {
+ return
AMQPSyncMirrorTest.this.createCallbackStorageManager(getConfiguration(),
getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory,
ioCriticalErrorListener, storageCallback);
+ }
+ };
+
+ server.getConfiguration().setName(name);
+ server.getConfiguration().getAcceptorConfigurations().clear();
+
server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(slowServer,
port));
+ server.getConfiguration().setMessageExpiryScanPeriod(-1);
+
+ server.getConfiguration().setJMXManagementEnabled(true);
+
+ configureAddressPolicy(server);
+ configureBrokerSecurity(server);
+
+ addServer(server);
+
+ return server;
+ }
+
+ private interface StorageCallback {
+ void storage(boolean isUpdate,
+ boolean isCommit,
+ long txID,
+ long id,
+ byte recordType,
+ Persister persister,
+ Object record);
+ }
+
+ private StorageManager createCallbackStorageManager(Configuration
configuration,
+ CriticalAnalyzer
criticalAnalyzer,
+ ExecutorFactory executorFactory,
+ ScheduledExecutorService
scheduledPool,
+ ExecutorFactory
ioExecutorFactory,
+ IOCriticalErrorListener
ioCriticalErrorListener,
+ StorageCallback
storageCallback) {
+ return new JournalStorageManager(configuration, criticalAnalyzer,
executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) {
+ @Override
+ protected Journal createMessageJournal(Configuration config,
+ IOCriticalErrorListener
criticalErrorListener,
+ int fileSize) {
+ return new JournalImpl(ioExecutorFactory, fileSize,
config.getJournalMinFiles(), config.getJournalPoolFiles(),
config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(),
config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq",
journalFF.getMaxIO(), 0, criticalErrorListener,
config.getJournalMaxAtticFiles()) {
+ @Override
+ public void appendAddRecordTransactional(long txID,
+ long id,
+ byte recordType,
+ Persister persister,
+ Object record) throws
Exception {
+ storageCallback.storage(false, false, txID, id, recordType,
persister, record);
+ super.appendAddRecordTransactional(txID, id, recordType,
persister, record);
+ }
+
+ @Override
+ public void appendAddRecord(long id,
+ byte recordType,
+ Persister persister,
+ Object record,
+ boolean sync,
+ IOCompletion callback) throws
Exception {
+ storageCallback.storage(false, false, -1, id, recordType,
persister, record);
+ super.appendAddRecord(id, recordType, persister, record,
sync, callback);
+ }
+
+ @Override
+ public void appendUpdateRecord(long id,
+ byte recordType,
+ EncodingSupport record,
+ boolean sync) throws Exception {
+ storageCallback.storage(true, false, -1, id, recordType,
null, record);
+ super.appendUpdateRecord(id, recordType, record, sync);
+ }
+
+ @Override
+ public void appendUpdateRecordTransactional(long txID,
+ long id,
+ byte recordType,
+ EncodingSupport
record) throws Exception {
+ storageCallback.storage(true, false, txID, id, recordType,
null, record);
+ super.appendUpdateRecordTransactional(txID, id, recordType,
record);
+ }
+
+ @Override
+ public void appendCommitRecord(long txID,
+ boolean sync,
+ IOCompletion callback,
+ boolean lineUpContext) throws
Exception {
+ storageCallback.storage(false, true, txID, txID, (byte)0,
null, null);
+ super.appendCommitRecord(txID, sync, callback,
lineUpContext);
+ }
+
+ @Override
+ public void tryAppendUpdateRecord(long id,
+ byte recordType,
+ Persister persister,
+ Object record,
+ boolean sync,
+ boolean replaceableUpdate,
+ JournalUpdateCallback updateCallback,
+ IOCompletion callback) throws
Exception {
+ storageCallback.storage(true, false, -1, -1, recordType,
persister, record);
+ super.tryAppendUpdateRecord(id, recordType, persister,
record, sync, replaceableUpdate, updateCallback, callback);
+ }
+ };
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConfigurationValidationTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConfigurationValidationTest.java
index 0a690c2a78..8b332fdf02 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConfigurationValidationTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConfigurationValidationTest.java
@@ -94,6 +94,7 @@ public class ConfigurationValidationTest extends
ActiveMQTestBase {
Assert.assertFalse(mirrorConnectionElement.isQueueCreation());
Assert.assertFalse(mirrorConnectionElement.isQueueRemoval());
Assert.assertFalse(mirrorConnectionElement.isDurable());
+ Assert.assertTrue(mirrorConnectionElement.isSync());
amqpBrokerConnectConfiguration = fc.getAMQPConnection().get(1);
@@ -104,6 +105,7 @@ public class ConfigurationValidationTest extends
ActiveMQTestBase {
Assert.assertFalse(mirrorConnectionElement.isDurable());
Assert.assertTrue(mirrorConnectionElement.isQueueCreation());
Assert.assertTrue(mirrorConnectionElement.isQueueRemoval());
+ Assert.assertFalse(mirrorConnectionElement.isSync()); // checking the
default
amqpBrokerConnectConfiguration = fc.getAMQPConnection().get(2);
Assert.assertFalse(amqpBrokerConnectConfiguration.isAutostart());