[
https://issues.apache.org/jira/browse/ARTEMIS-4136?focusedWorklogId=840411&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-840411
]
ASF GitHub Bot logged work on ARTEMIS-4136:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 19/Jan/23 18:30
Start Date: 19/Jan/23 18:30
Worklog Time Spent: 10m
Work Description: tabish121 commented on code in PR #4338:
URL: https://github.com/apache/activemq-artemis/pull/4338#discussion_r1081606585
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -239,37 +249,107 @@ public void sendMessage(Message message, RoutingContext
context, List<MessageRef
try {
context.setReusable(false);
- MessageReference ref =
MessageReference.Factory.createReference(message, snfQueue);
- String nodeID = setProtocolData(idSupplier, ref);
+ String nodeID = idSupplier.getServerID(message);
+
if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
logger.trace("Message {} already belonged to the node, {}, it
won't circle send", message, getRemoteMirrorId());
return;
}
+
+ MessageReference ref =
MessageReference.Factory.createReference(message, snfQueue);
+ setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
snfQueue.refUp(ref);
+ List<MessageReference> refs;
+
+ if (tx != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Mirroring Message " + message + " with TX");
+ }
+ MirrorSendOperation mirrorSendOperation = getSendOperation(tx);
+ refs = mirrorSendOperation.mirroredRefs;
+ } else {
+ refs = new LinkedList<>();
+ }
+
refs.add(ref);
+ if (sync) {
+ OperationContext operContext =
OperationContextImpl.getContext(server.getExecutorFactory());
+ if (tx == null) {
+ // notice that if transactional, the context is lined up on
beforeCommit as part of the transaction operation
+ operContext.replicationLineUp();
+ }
+ if (logger.isDebugEnabled()) {
Review Comment:
Gate check isn't needed here.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +404,185 @@ public void postAcknowledge(MessageReference ref,
AckReason reason) throws Excep
if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() ||
ref.getQueue().isMirrorController()))) {
if (logger.isDebugEnabled()) {
- logger.debug("{} rejecting postAcknowledge queue={}, ref={} to
avoid infinite loop with the mirror (reflection)", server,
ref.getQueue().getName(), ref);
+ logger.debug("{} rejecting preAcknowledge queue={}, ref={} to
avoid infinite loop with the mirror (reflection)", server,
ref.getQueue().getName(), ref);
}
return;
}
if (ignoreAddress(ref.getQueue().getAddress())) {
if (logger.isTraceEnabled()) {
- logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue
address is excluded", server, ref.getQueue().getName(), ref);
+ logger.trace("{} rejecting preAcknowledge queue={}, ref={}, queue
address is excluded", server, ref.getQueue().getName(), ref);
}
return;
}
- logger.trace("{} postAcknowledge {}", server, ref);
+ logger.trace("{} preAcknowledge {}", server, ref);
String nodeID = idSupplier.getServerID(ref); // notice the brokerID will
be null for any message generated on this broker.
long internalID = idSupplier.getID(ref);
- if (logger.isTraceEnabled()) {
- logger.trace("{} sending ack message from server {} with
messageID={}", server, nodeID, internalID);
+ Message messageCommand = createMessage(ref.getQueue().getAddress(),
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+ if (sync) {
+ OperationContext operationContext;
+ operationContext =
OperationContextImpl.getContext(server.getExecutorFactory());
+ messageCommand.setUserContext(OperationContext.class,
operationContext);
+ if (tx == null) {
+ // notice that if transactional, the context is lined up on
beforeCommit as part of the transaction operation
+ operationContext.replicationLineUp();
+ }
+ }
+
+ if (tx != null) {
+ MirrorACKOperation operation = getAckOperation(tx);
+ // notice the operationContext.replicationLineUp is done on
beforeCommit as part of the TX
+ operation.addMessage(messageCommand, ref);
+ } else {
+ server.getStorageManager().afterStoreOperations(new IOCallback() {
+ @Override
+ public void done() {
+ try {
+ logger.debug("Routing ack out of message {}", ref);
+ route(server, messageCommand);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+ });
+ }
+ }
+
+ private MirrorACKOperation getAckOperation(Transaction tx) {
+ MirrorACKOperation ackOperation = (MirrorACKOperation)
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+ if (ackOperation == null) {
+ logger.trace("getAckOperation::setting operation on transaction {}",
tx);
+ ackOperation = new MirrorACKOperation(server);
+ tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION,
ackOperation);
+ tx.afterStore(ackOperation);
+ }
+
+ return ackOperation;
+ }
+
+ private MirrorSendOperation getSendOperation(Transaction tx) {
+ if (tx == null) {
+ return null;
+ }
+ MirrorSendOperation sendOperation = (MirrorSendOperation)
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+ if (sendOperation == null) {
+ logger.trace("getSendOperation::setting operation on transaction {}",
tx);
+ sendOperation = new MirrorSendOperation();
+ tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION,
sendOperation);
+ tx.afterStore(sendOperation);
+ }
+
+ return sendOperation;
+ }
+
+ private static class MirrorACKOperation extends
TransactionOperationAbstract {
+
+ final ActiveMQServer server;
+
+ final LinkedList<MessageReference> refs = new LinkedList<>();
+ final LinkedList<Message> messages = new LinkedList<>();
+
+ MirrorACKOperation(ActiveMQServer server) {
+ this.server = server;
+ }
+
+ /**
+ *
+ * @param message the message with the instruction to ack on the target
node. Notice this is not the message owned by the reference.
+ * @param ref the reference being acked
+ */
+ public void addMessage(Message message, MessageReference ref) {
+ refs.add(ref);
+ messages.add(message);
+ }
+
+ @Override
+ public void beforeCommit(Transaction tx) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("MirrorACKOperation::beforeCommit processing {}",
messages);
+ }
+ for (Message message : messages) {
+ OperationContext context = (OperationContext)
message.getUserContext(OperationContext.class);
+ if (context != null) {
+ context.replicationLineUp();
+ }
+ }
+ }
+
+ @Override
+ public void afterCommit(Transaction tx) {
+ if (logger.isDebugEnabled()) {
Review Comment:
Unneeded gate check
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -239,37 +249,107 @@ public void sendMessage(Message message, RoutingContext
context, List<MessageRef
try {
context.setReusable(false);
- MessageReference ref =
MessageReference.Factory.createReference(message, snfQueue);
- String nodeID = setProtocolData(idSupplier, ref);
+ String nodeID = idSupplier.getServerID(message);
+
if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
logger.trace("Message {} already belonged to the node, {}, it
won't circle send", message, getRemoteMirrorId());
return;
}
+
+ MessageReference ref =
MessageReference.Factory.createReference(message, snfQueue);
+ setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
snfQueue.refUp(ref);
+ List<MessageReference> refs;
+
+ if (tx != null) {
+ if (logger.isDebugEnabled()) {
Review Comment:
Remove the is enabled gate checks as they are useless unless there's more
than two args, but also fix the logger to use proper formatting to avoid the
unnecessary toString
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -303,8 +379,18 @@ private static Properties getProperties(Message message) {
}
}
+ private void postACKInternalMessage(MessageReference reference) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("PostAckInternal::server={}, ref={}",
server.getIdentity(), reference);
+ }
+ if (sync) {
+ syncDone(reference);
+ }
+ }
+
@Override
- public void postAcknowledge(MessageReference ref, AckReason reason) throws
Exception {
+ public void preAcknowledge(final Transaction tx, final MessageReference
ref, final AckReason reason) throws Exception {
+ logger.trace("preAcknowledge tx={}, ref={}, reason={}", tx, ref, reason);
Review Comment:
This one actually could use a gate check to avoid object[] allocation
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +404,185 @@ public void postAcknowledge(MessageReference ref,
AckReason reason) throws Excep
if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() ||
ref.getQueue().isMirrorController()))) {
if (logger.isDebugEnabled()) {
- logger.debug("{} rejecting postAcknowledge queue={}, ref={} to
avoid infinite loop with the mirror (reflection)", server,
ref.getQueue().getName(), ref);
+ logger.debug("{} rejecting preAcknowledge queue={}, ref={} to
avoid infinite loop with the mirror (reflection)", server,
ref.getQueue().getName(), ref);
}
return;
}
if (ignoreAddress(ref.getQueue().getAddress())) {
if (logger.isTraceEnabled()) {
- logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue
address is excluded", server, ref.getQueue().getName(), ref);
+ logger.trace("{} rejecting preAcknowledge queue={}, ref={}, queue
address is excluded", server, ref.getQueue().getName(), ref);
}
return;
}
- logger.trace("{} postAcknowledge {}", server, ref);
+ logger.trace("{} preAcknowledge {}", server, ref);
String nodeID = idSupplier.getServerID(ref); // notice the brokerID will
be null for any message generated on this broker.
long internalID = idSupplier.getID(ref);
- if (logger.isTraceEnabled()) {
- logger.trace("{} sending ack message from server {} with
messageID={}", server, nodeID, internalID);
+ Message messageCommand = createMessage(ref.getQueue().getAddress(),
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+ if (sync) {
+ OperationContext operationContext;
+ operationContext =
OperationContextImpl.getContext(server.getExecutorFactory());
+ messageCommand.setUserContext(OperationContext.class,
operationContext);
+ if (tx == null) {
+ // notice that if transactional, the context is lined up on
beforeCommit as part of the transaction operation
+ operationContext.replicationLineUp();
+ }
+ }
+
+ if (tx != null) {
+ MirrorACKOperation operation = getAckOperation(tx);
+ // notice the operationContext.replicationLineUp is done on
beforeCommit as part of the TX
+ operation.addMessage(messageCommand, ref);
+ } else {
+ server.getStorageManager().afterStoreOperations(new IOCallback() {
+ @Override
+ public void done() {
+ try {
+ logger.debug("Routing ack out of message {}", ref);
+ route(server, messageCommand);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+ });
+ }
+ }
+
+ private MirrorACKOperation getAckOperation(Transaction tx) {
+ MirrorACKOperation ackOperation = (MirrorACKOperation)
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+ if (ackOperation == null) {
+ logger.trace("getAckOperation::setting operation on transaction {}",
tx);
+ ackOperation = new MirrorACKOperation(server);
+ tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION,
ackOperation);
+ tx.afterStore(ackOperation);
+ }
+
+ return ackOperation;
+ }
+
+ private MirrorSendOperation getSendOperation(Transaction tx) {
+ if (tx == null) {
+ return null;
+ }
+ MirrorSendOperation sendOperation = (MirrorSendOperation)
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+ if (sendOperation == null) {
+ logger.trace("getSendOperation::setting operation on transaction {}",
tx);
+ sendOperation = new MirrorSendOperation();
+ tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION,
sendOperation);
+ tx.afterStore(sendOperation);
+ }
+
+ return sendOperation;
+ }
+
+ private static class MirrorACKOperation extends
TransactionOperationAbstract {
+
+ final ActiveMQServer server;
+
+ final LinkedList<MessageReference> refs = new LinkedList<>();
+ final LinkedList<Message> messages = new LinkedList<>();
+
+ MirrorACKOperation(ActiveMQServer server) {
+ this.server = server;
+ }
+
+ /**
+ *
+ * @param message the message with the instruction to ack on the target
node. Notice this is not the message owned by the reference.
+ * @param ref the reference being acked
+ */
+ public void addMessage(Message message, MessageReference ref) {
+ refs.add(ref);
+ messages.add(message);
+ }
+
+ @Override
+ public void beforeCommit(Transaction tx) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("MirrorACKOperation::beforeCommit processing {}",
messages);
+ }
+ for (Message message : messages) {
+ OperationContext context = (OperationContext)
message.getUserContext(OperationContext.class);
+ if (context != null) {
+ context.replicationLineUp();
+ }
+ }
+ }
+
+ @Override
+ public void afterCommit(Transaction tx) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("MirrorACKOperation::afterCommit processing {}",
messages);
+ }
+ for (Message message : messages) {
+ try {
+ route(server, message);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ for (MessageReference ref : refs) {
+ ref.getMessage().usageDown();
+ }
+ }
+
+ @Override
+ public void afterRollback(Transaction tx) {
+ for (Message message : messages) {
+ OperationContext context = (OperationContext)
message.getUserContext(OperationContext.class);
+ context.replicationDone();
+ }
+ }
+
+ }
+
+ private static final class MirrorSendOperation extends
TransactionOperationAbstract {
+ final List<MessageReference> mirroredRefs = new LinkedList<>();
+
+ @Override
+ public void beforeCommit(Transaction tx) {
+ for (MessageReference ref : mirroredRefs) {
+ OperationContext context =
ref.getProtocolData(OperationContext.class);
+ if (context != null) {
+ context.replicationLineUp();
+ }
+ }
+ }
+
+ @Override
+ public void afterRollback(Transaction tx) {
+ if (logger.isDebugEnabled()) {
Review Comment:
Unneeded gate check
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -318,28 +404,185 @@ public void postAcknowledge(MessageReference ref,
AckReason reason) throws Excep
if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() ||
ref.getQueue().isMirrorController()))) {
if (logger.isDebugEnabled()) {
- logger.debug("{} rejecting postAcknowledge queue={}, ref={} to
avoid infinite loop with the mirror (reflection)", server,
ref.getQueue().getName(), ref);
+ logger.debug("{} rejecting preAcknowledge queue={}, ref={} to
avoid infinite loop with the mirror (reflection)", server,
ref.getQueue().getName(), ref);
}
return;
}
if (ignoreAddress(ref.getQueue().getAddress())) {
if (logger.isTraceEnabled()) {
- logger.trace("{} rejecting postAcknowledge queue={}, ref={}, queue
address is excluded", server, ref.getQueue().getName(), ref);
+ logger.trace("{} rejecting preAcknowledge queue={}, ref={}, queue
address is excluded", server, ref.getQueue().getName(), ref);
}
return;
}
- logger.trace("{} postAcknowledge {}", server, ref);
+ logger.trace("{} preAcknowledge {}", server, ref);
String nodeID = idSupplier.getServerID(ref); // notice the brokerID will
be null for any message generated on this broker.
long internalID = idSupplier.getID(ref);
- if (logger.isTraceEnabled()) {
- logger.trace("{} sending ack message from server {} with
messageID={}", server, nodeID, internalID);
+ Message messageCommand = createMessage(ref.getQueue().getAddress(),
ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
+ if (sync) {
+ OperationContext operationContext;
+ operationContext =
OperationContextImpl.getContext(server.getExecutorFactory());
+ messageCommand.setUserContext(OperationContext.class,
operationContext);
+ if (tx == null) {
+ // notice that if transactional, the context is lined up on
beforeCommit as part of the transaction operation
+ operationContext.replicationLineUp();
+ }
+ }
+
+ if (tx != null) {
+ MirrorACKOperation operation = getAckOperation(tx);
+ // notice the operationContext.replicationLineUp is done on
beforeCommit as part of the TX
+ operation.addMessage(messageCommand, ref);
+ } else {
+ server.getStorageManager().afterStoreOperations(new IOCallback() {
+ @Override
+ public void done() {
+ try {
+ logger.debug("Routing ack out of message {}", ref);
+ route(server, messageCommand);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+ });
+ }
+ }
+
+ private MirrorACKOperation getAckOperation(Transaction tx) {
+ MirrorACKOperation ackOperation = (MirrorACKOperation)
tx.getProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION);
+ if (ackOperation == null) {
+ logger.trace("getAckOperation::setting operation on transaction {}",
tx);
+ ackOperation = new MirrorACKOperation(server);
+ tx.putProperty(TransactionPropertyIndexes.MIRROR_ACK_OPERATION,
ackOperation);
+ tx.afterStore(ackOperation);
+ }
+
+ return ackOperation;
+ }
+
+ private MirrorSendOperation getSendOperation(Transaction tx) {
+ if (tx == null) {
+ return null;
+ }
+ MirrorSendOperation sendOperation = (MirrorSendOperation)
tx.getProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION);
+ if (sendOperation == null) {
+ logger.trace("getSendOperation::setting operation on transaction {}",
tx);
+ sendOperation = new MirrorSendOperation();
+ tx.putProperty(TransactionPropertyIndexes.MIRROR_SEND_OPERATION,
sendOperation);
+ tx.afterStore(sendOperation);
+ }
+
+ return sendOperation;
+ }
+
+ private static class MirrorACKOperation extends
TransactionOperationAbstract {
+
+ final ActiveMQServer server;
+
+ final LinkedList<MessageReference> refs = new LinkedList<>();
+ final LinkedList<Message> messages = new LinkedList<>();
+
+ MirrorACKOperation(ActiveMQServer server) {
+ this.server = server;
+ }
+
+ /**
+ *
+ * @param message the message with the instruction to ack on the target
node. Notice this is not the message owned by the reference.
+ * @param ref the reference being acked
+ */
+ public void addMessage(Message message, MessageReference ref) {
+ refs.add(ref);
+ messages.add(message);
+ }
+
+ @Override
+ public void beforeCommit(Transaction tx) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("MirrorACKOperation::beforeCommit processing {}",
messages);
+ }
+ for (Message message : messages) {
+ OperationContext context = (OperationContext)
message.getUserContext(OperationContext.class);
+ if (context != null) {
+ context.replicationLineUp();
+ }
+ }
+ }
+
+ @Override
+ public void afterCommit(Transaction tx) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("MirrorACKOperation::afterCommit processing {}",
messages);
+ }
+ for (Message message : messages) {
+ try {
+ route(server, message);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ for (MessageReference ref : refs) {
+ ref.getMessage().usageDown();
+ }
+ }
+
+ @Override
+ public void afterRollback(Transaction tx) {
+ for (Message message : messages) {
+ OperationContext context = (OperationContext)
message.getUserContext(OperationContext.class);
+ context.replicationDone();
+ }
+ }
+
+ }
+
+ private static final class MirrorSendOperation extends
TransactionOperationAbstract {
+ final List<MessageReference> mirroredRefs = new LinkedList<>();
+
+ @Override
+ public void beforeCommit(Transaction tx) {
+ for (MessageReference ref : mirroredRefs) {
+ OperationContext context =
ref.getProtocolData(OperationContext.class);
+ if (context != null) {
+ context.replicationLineUp();
+ }
+ }
+ }
+
+ @Override
+ public void afterRollback(Transaction tx) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("rolling back context for {} times",
mirroredRefs.size());
+ }
+ for (MessageReference mirroredRef : mirroredRefs) {
+ OperationContext localCTX =
mirroredRef.getProtocolData(OperationContext.class);
+ if (localCTX != null) {
+ localCTX.replicationDone();
+ }
+ }
+ }
+
+ @Override
+ public void afterCommit(Transaction tx) {
+ if (logger.isTraceEnabled()) {
Review Comment:
Ditto
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -128,11 +133,17 @@ public void storeLineUp() {
@Override
public void replicationLineUp() {
REPLICATION_LINEUP_UPDATER.incrementAndGet(this);
+ if (logger.isTraceEnabled()) {
Review Comment:
Unneeded gate check
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
+
+ Logger logger = LoggerFactory.getLogger(AMQPSyncMirrorTest.class);
+
+ private static final String SLOW_SERVER_NAME = "slow";
+ private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
+
+ private ActiveMQServer slowServer;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ protected String getConfiguredProtocols() {
+ return "AMQP,OPENWIRE,CORE";
+ }
+
+ @Test
+ public void testPersistedSendAMQP() throws Exception {
+ testPersistedSend("AMQP", false, 100);
+ }
+
+ @Test
+ public void testPersistedSendAMQPLarge() throws Exception {
+ testPersistedSend("AMQP", false, 200 * 1024);
+ }
+
+
+ @Test
+ public void testPersistedSendCore() throws Exception {
+ testPersistedSend("CORE", false, 100);
+ }
+
+ @Test
+ public void testPersistedSendCoreLarge() throws Exception {
+ testPersistedSend("CORE", false, 200 * 1024);
+ }
+
+ @Test
+ public void testPersistedSendAMQPTXLarge() throws Exception {
+ testPersistedSend("AMQP", true, 200 * 1024);
+ }
+
+ @Test
+ public void testPersistedSendAMQPTX() throws Exception {
+ testPersistedSend("AMQP", true, 100);
+ }
+
+ @Test
+ public void testPersistedSendCoreTX() throws Exception {
+ testPersistedSend("CORE", true, 100);
+ }
+
+ @Test
+ public void testPersistedSendCoreTXLarge() throws Exception {
+ testPersistedSend("CORE", true, 200 * 1024);
+ }
+
+ private void testPersistedSend(String protocol, boolean transactional, int
messageSize) throws Exception {
+ ReusableLatch sendPending = new ReusableLatch(0);
+ Semaphore semSend = new Semaphore(1);
+ Semaphore semAck = new Semaphore(1);
+ AtomicInteger errors = new AtomicInteger(0);
+
+ try {
+ final int NUMBER_OF_MESSAGES = 10;
+
+ AtomicInteger countStored = new AtomicInteger(0);
+
+ slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT,
SLOW_SERVER_NAME, (isUpdate, isTX, txId, id, recordType, persister, record) -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("StorageCallback::slow isUpdate={}, isTX={},
txID={}, id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType,
record);
+ }
+ if (transactional) {
+ if (isTX) {
+ try {
+ if (countStored.get() > 0) {
+ countStored.incrementAndGet();
+ logger.trace("semSend.tryAcquire");
+ if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+ logger.trace("acquired TX, now release");
+ semSend.release();
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ if (recordType == JournalRecordIds.ACKNOWLEDGE_REF) {
+ logger.debug("slow ACK REF");
+ try {
+ if (semAck.tryAcquire(20, TimeUnit.SECONDS)) {
+ semAck.release();
+ logger.trace("slow acquired ACK semaphore");
+ } else {
+ logger.trace("Semaphore wasn't acquired");
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ if (recordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
+ try {
+ countStored.incrementAndGet();
+ if (!transactional) {
+ logger.trace("semSend.tryAcquire");
+ if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+ logger.trace("acquired non TX now release");
+ semSend.release();
+ }
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ }
+ }
+ });
+ slowServer.setIdentity("slowServer");
+ server.setIdentity("server");
+
+ ExecutorService pool = Executors.newFixedThreadPool(5);
+ runAfter(pool::shutdown);
+
+ AMQPMirrorBrokerConnectionElement replication =
configureMirrorTowardsSlow(server);
+
+ slowServer.getConfiguration().setName("slow");
+ server.getConfiguration().setName("fast");
+ slowServer.start();
+ server.start();
+
+ waitForServerToStart(slowServer);
+ waitForServerToStart(server);
+
+ server.addAddressInfo(new
AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+ server.createQueue(new
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+ Wait.waitFor(() -> slowServer.locateQueue(getQueueName()) != null);
+ Queue replicatedQueue = slowServer.locateQueue(getQueueName());
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:" + AMQP_PORT);
+
+ if (factory instanceof ActiveMQConnectionFactory) {
+ ((ActiveMQConnectionFactory)
factory).getServerLocator().setBlockOnAcknowledge(true);
+ }
+
+ Connection connection = factory.createConnection();
+ runAfter(connection::close);
+ Session session = connection.createSession(transactional,
transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(getQueueName()));
+
+ connection.start();
+
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ final String bodyMessage;
+ {
+ StringBuffer buffer = new StringBuffer();
+ for (int i = 0; i < messageSize; i++) {
+ buffer.append("large Buffer...");
+ }
+ bodyMessage = buffer.toString();
+ }
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ if (logger.isTraceEnabled()) {
Review Comment:
Unneeded gate check
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -239,37 +249,107 @@ public void sendMessage(Message message, RoutingContext
context, List<MessageRef
try {
context.setReusable(false);
- MessageReference ref =
MessageReference.Factory.createReference(message, snfQueue);
- String nodeID = setProtocolData(idSupplier, ref);
+ String nodeID = idSupplier.getServerID(message);
+
if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
logger.trace("Message {} already belonged to the node, {}, it
won't circle send", message, getRemoteMirrorId());
return;
}
+
+ MessageReference ref =
MessageReference.Factory.createReference(message, snfQueue);
+ setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
snfQueue.refUp(ref);
+ List<MessageReference> refs;
+
+ if (tx != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Mirroring Message " + message + " with TX");
+ }
+ MirrorSendOperation mirrorSendOperation = getSendOperation(tx);
+ refs = mirrorSendOperation.mirroredRefs;
+ } else {
+ refs = new LinkedList<>();
+ }
+
refs.add(ref);
+ if (sync) {
+ OperationContext operContext =
OperationContextImpl.getContext(server.getExecutorFactory());
+ if (tx == null) {
+ // notice that if transactional, the context is lined up on
beforeCommit as part of the transaction operation
+ operContext.replicationLineUp();
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("mirror syncUp context={}, ref={}", operContext,
ref);
+ }
+ ref.setProtocolData(OperationContext.class, operContext);
+ }
+
if (message.isDurable() && snfQueue.isDurable()) {
PostOfficeImpl.storeDurableReference(server.getStorageManager(),
message, context.getTransaction(), snfQueue, true);
}
+ if (tx == null) {
+ server.getStorageManager().afterStoreOperations(new IOCallback() {
+ @Override
+ public void done() {
+ PostOfficeImpl.processReferences(refs, false);
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+ });
+ }
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
+ private void syncDone(MessageReference reference) {
+ OperationContext ctx = reference.getProtocolData(OperationContext.class);
+ if (ctx != null) {
+ ctx.replicationDone();
+ if (logger.isDebugEnabled()) {
Review Comment:
Remove gate check.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -239,37 +249,107 @@ public void sendMessage(Message message, RoutingContext
context, List<MessageRef
try {
context.setReusable(false);
- MessageReference ref =
MessageReference.Factory.createReference(message, snfQueue);
- String nodeID = setProtocolData(idSupplier, ref);
+ String nodeID = idSupplier.getServerID(message);
+
if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
logger.trace("Message {} already belonged to the node, {}, it
won't circle send", message, getRemoteMirrorId());
return;
}
+
+ MessageReference ref =
MessageReference.Factory.createReference(message, snfQueue);
+ setProtocolData(ref, nodeID, idSupplier.getID(ref));
+
snfQueue.refUp(ref);
+ List<MessageReference> refs;
+
+ if (tx != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Mirroring Message " + message + " with TX");
+ }
+ MirrorSendOperation mirrorSendOperation = getSendOperation(tx);
+ refs = mirrorSendOperation.mirroredRefs;
+ } else {
+ refs = new LinkedList<>();
Review Comment:
Seems like this could be made a Collections.singletonList instead of a
LinkedList given there is only ever going to be one entry from what I can see,
just need to move refs.add up into the other if block
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java:
##########
@@ -128,11 +133,17 @@ public void storeLineUp() {
@Override
public void replicationLineUp() {
REPLICATION_LINEUP_UPDATER.incrementAndGet(this);
+ if (logger.isTraceEnabled()) {
+ logger.trace("replicationLineUp:: {}", replicationLineUpField);
+ }
}
@Override
public synchronized void replicationDone() {
replicated++;
+ if (logger.isTraceEnabled()) {
Review Comment:
Unneeded gate check
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -303,8 +379,18 @@ private static Properties getProperties(Message message) {
}
}
+ private void postACKInternalMessage(MessageReference reference) {
+ if (logger.isDebugEnabled()) {
Review Comment:
Unnecessary gate check
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -107,6 +115,7 @@ public boolean isStarted() {
public AMQPMirrorControllerSource(ProtonProtocolManager
protonProtocolManager, Queue snfQueue, ActiveMQServer server,
AMQPMirrorBrokerConnectionElement replicaConfig,
AMQPBrokerConnection brokerConnection) {
super(server);
+ assert server != null;
Review Comment:
Generally in a constructor I'd rather the contract be explicitly tested with
Objects.requireNonNull(x) vs an assert
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
+
+ Logger logger = LoggerFactory.getLogger(AMQPSyncMirrorTest.class);
+
+ private static final String SLOW_SERVER_NAME = "slow";
+ private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
+
+ private ActiveMQServer slowServer;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ protected String getConfiguredProtocols() {
+ return "AMQP,OPENWIRE,CORE";
+ }
+
+ @Test
+ public void testPersistedSendAMQP() throws Exception {
+ testPersistedSend("AMQP", false, 100);
+ }
+
+ @Test
+ public void testPersistedSendAMQPLarge() throws Exception {
+ testPersistedSend("AMQP", false, 200 * 1024);
+ }
+
+
+ @Test
+ public void testPersistedSendCore() throws Exception {
+ testPersistedSend("CORE", false, 100);
+ }
+
+ @Test
+ public void testPersistedSendCoreLarge() throws Exception {
+ testPersistedSend("CORE", false, 200 * 1024);
+ }
+
+ @Test
+ public void testPersistedSendAMQPTXLarge() throws Exception {
+ testPersistedSend("AMQP", true, 200 * 1024);
+ }
+
+ @Test
+ public void testPersistedSendAMQPTX() throws Exception {
+ testPersistedSend("AMQP", true, 100);
+ }
+
+ @Test
+ public void testPersistedSendCoreTX() throws Exception {
+ testPersistedSend("CORE", true, 100);
+ }
+
+ @Test
+ public void testPersistedSendCoreTXLarge() throws Exception {
+ testPersistedSend("CORE", true, 200 * 1024);
+ }
+
+ private void testPersistedSend(String protocol, boolean transactional, int
messageSize) throws Exception {
+ ReusableLatch sendPending = new ReusableLatch(0);
+ Semaphore semSend = new Semaphore(1);
+ Semaphore semAck = new Semaphore(1);
+ AtomicInteger errors = new AtomicInteger(0);
+
+ try {
+ final int NUMBER_OF_MESSAGES = 10;
+
+ AtomicInteger countStored = new AtomicInteger(0);
+
+ slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT,
SLOW_SERVER_NAME, (isUpdate, isTX, txId, id, recordType, persister, record) -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("StorageCallback::slow isUpdate={}, isTX={},
txID={}, id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType,
record);
+ }
+ if (transactional) {
+ if (isTX) {
+ try {
+ if (countStored.get() > 0) {
+ countStored.incrementAndGet();
+ logger.trace("semSend.tryAcquire");
+ if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+ logger.trace("acquired TX, now release");
+ semSend.release();
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ if (recordType == JournalRecordIds.ACKNOWLEDGE_REF) {
+ logger.debug("slow ACK REF");
+ try {
+ if (semAck.tryAcquire(20, TimeUnit.SECONDS)) {
+ semAck.release();
+ logger.trace("slow acquired ACK semaphore");
+ } else {
+ logger.trace("Semaphore wasn't acquired");
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ if (recordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
+ try {
+ countStored.incrementAndGet();
+ if (!transactional) {
+ logger.trace("semSend.tryAcquire");
+ if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+ logger.trace("acquired non TX now release");
+ semSend.release();
+ }
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ }
+ }
+ });
+ slowServer.setIdentity("slowServer");
+ server.setIdentity("server");
+
+ ExecutorService pool = Executors.newFixedThreadPool(5);
+ runAfter(pool::shutdown);
+
+ AMQPMirrorBrokerConnectionElement replication =
configureMirrorTowardsSlow(server);
+
+ slowServer.getConfiguration().setName("slow");
+ server.getConfiguration().setName("fast");
+ slowServer.start();
+ server.start();
+
+ waitForServerToStart(slowServer);
+ waitForServerToStart(server);
+
+ server.addAddressInfo(new
AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+ server.createQueue(new
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+ Wait.waitFor(() -> slowServer.locateQueue(getQueueName()) != null);
+ Queue replicatedQueue = slowServer.locateQueue(getQueueName());
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:" + AMQP_PORT);
+
+ if (factory instanceof ActiveMQConnectionFactory) {
+ ((ActiveMQConnectionFactory)
factory).getServerLocator().setBlockOnAcknowledge(true);
+ }
+
+ Connection connection = factory.createConnection();
+ runAfter(connection::close);
+ Session session = connection.createSession(transactional,
transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(getQueueName()));
+
+ connection.start();
+
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ final String bodyMessage;
+ {
+ StringBuffer buffer = new StringBuffer();
+ for (int i = 0; i < messageSize; i++) {
+ buffer.append("large Buffer...");
+ }
+ bodyMessage = buffer.toString();
+ }
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("===>>> send message {}", i);
+ }
+ int theI = i;
+ sendPending.countUp();
+ logger.trace("semSend.acquire");
+ semSend.acquire();
+ if (!transactional) {
+ pool.execute(() -> {
+ try {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Entering non TX send with sendPending =
" + sendPending.getCount());
+ }
+ TextMessage message =
session.createTextMessage(bodyMessage);
+ message.setStringProperty("strProperty", "" + theI);
+ producer.send(message);
+ sendPending.countDown();
+ if (logger.isTraceEnabled()) {
Review Comment:
Same as above.
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSyncMirrorTest.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalUpdateCallback;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPSyncMirrorTest extends AmqpClientTestSupport {
+
+ Logger logger = LoggerFactory.getLogger(AMQPSyncMirrorTest.class);
+
+ private static final String SLOW_SERVER_NAME = "slow";
+ private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
+
+ private ActiveMQServer slowServer;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ protected String getConfiguredProtocols() {
+ return "AMQP,OPENWIRE,CORE";
+ }
+
+ @Test
+ public void testPersistedSendAMQP() throws Exception {
+ testPersistedSend("AMQP", false, 100);
+ }
+
+ @Test
+ public void testPersistedSendAMQPLarge() throws Exception {
+ testPersistedSend("AMQP", false, 200 * 1024);
+ }
+
+
+ @Test
+ public void testPersistedSendCore() throws Exception {
+ testPersistedSend("CORE", false, 100);
+ }
+
+ @Test
+ public void testPersistedSendCoreLarge() throws Exception {
+ testPersistedSend("CORE", false, 200 * 1024);
+ }
+
+ @Test
+ public void testPersistedSendAMQPTXLarge() throws Exception {
+ testPersistedSend("AMQP", true, 200 * 1024);
+ }
+
+ @Test
+ public void testPersistedSendAMQPTX() throws Exception {
+ testPersistedSend("AMQP", true, 100);
+ }
+
+ @Test
+ public void testPersistedSendCoreTX() throws Exception {
+ testPersistedSend("CORE", true, 100);
+ }
+
+ @Test
+ public void testPersistedSendCoreTXLarge() throws Exception {
+ testPersistedSend("CORE", true, 200 * 1024);
+ }
+
+ private void testPersistedSend(String protocol, boolean transactional, int
messageSize) throws Exception {
+ ReusableLatch sendPending = new ReusableLatch(0);
+ Semaphore semSend = new Semaphore(1);
+ Semaphore semAck = new Semaphore(1);
+ AtomicInteger errors = new AtomicInteger(0);
+
+ try {
+ final int NUMBER_OF_MESSAGES = 10;
+
+ AtomicInteger countStored = new AtomicInteger(0);
+
+ slowServer = createServerWithCallbackStorage(SLOW_SERVER_PORT,
SLOW_SERVER_NAME, (isUpdate, isTX, txId, id, recordType, persister, record) -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("StorageCallback::slow isUpdate={}, isTX={},
txID={}, id={},recordType={}, record={}", isUpdate, isTX, txId, id, recordType,
record);
+ }
+ if (transactional) {
+ if (isTX) {
+ try {
+ if (countStored.get() > 0) {
+ countStored.incrementAndGet();
+ logger.trace("semSend.tryAcquire");
+ if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+ logger.trace("acquired TX, now release");
+ semSend.release();
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ if (recordType == JournalRecordIds.ACKNOWLEDGE_REF) {
+ logger.debug("slow ACK REF");
+ try {
+ if (semAck.tryAcquire(20, TimeUnit.SECONDS)) {
+ semAck.release();
+ logger.trace("slow acquired ACK semaphore");
+ } else {
+ logger.trace("Semaphore wasn't acquired");
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ if (recordType == JournalRecordIds.ADD_MESSAGE_PROTOCOL) {
+ try {
+ countStored.incrementAndGet();
+ if (!transactional) {
+ logger.trace("semSend.tryAcquire");
+ if (semSend.tryAcquire(20, TimeUnit.SECONDS)) {
+ logger.trace("acquired non TX now release");
+ semSend.release();
+ }
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ }
+ }
+ });
+ slowServer.setIdentity("slowServer");
+ server.setIdentity("server");
+
+ ExecutorService pool = Executors.newFixedThreadPool(5);
+ runAfter(pool::shutdown);
+
+ AMQPMirrorBrokerConnectionElement replication =
configureMirrorTowardsSlow(server);
+
+ slowServer.getConfiguration().setName("slow");
+ server.getConfiguration().setName("fast");
+ slowServer.start();
+ server.start();
+
+ waitForServerToStart(slowServer);
+ waitForServerToStart(server);
+
+ server.addAddressInfo(new
AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+ server.createQueue(new
QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+ Wait.waitFor(() -> slowServer.locateQueue(getQueueName()) != null);
+ Queue replicatedQueue = slowServer.locateQueue(getQueueName());
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:" + AMQP_PORT);
+
+ if (factory instanceof ActiveMQConnectionFactory) {
+ ((ActiveMQConnectionFactory)
factory).getServerLocator().setBlockOnAcknowledge(true);
+ }
+
+ Connection connection = factory.createConnection();
+ runAfter(connection::close);
+ Session session = connection.createSession(transactional,
transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(getQueueName()));
+
+ connection.start();
+
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ final String bodyMessage;
+ {
+ StringBuffer buffer = new StringBuffer();
+ for (int i = 0; i < messageSize; i++) {
+ buffer.append("large Buffer...");
+ }
+ bodyMessage = buffer.toString();
+ }
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("===>>> send message {}", i);
+ }
+ int theI = i;
+ sendPending.countUp();
+ logger.trace("semSend.acquire");
+ semSend.acquire();
+ if (!transactional) {
+ pool.execute(() -> {
+ try {
+ if (logger.isTraceEnabled()) {
Review Comment:
Wouldn't need a gate check here if the trace used proper formatting.
Issue Time Tracking
-------------------
Worklog Id: (was: 840411)
Time Spent: 50m (was: 40m)
> Mirror sync replication
> -----------------------
>
> Key: ARTEMIS-4136
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4136
> Project: ActiveMQ Artemis
> Issue Type: New Feature
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Fix For: 2.28.0
>
> Time Spent: 50m
> Remaining Estimate: 0h
>
> I'm adding an option sync=true|false on mirror.
> It will be possible to configure a mirror as this:
> <broker-connections>
> <amqp-connection uri="tcp://test1:111" name="test1"
> retry-interval="333" reconnect-attempts="33" user="testuser"
> password="testpassword">
> <mirror sync="true"/>
> </amqp-connection
> </broker-connection>
> if sync is set to true, any client blocking operation would wait a mirror
> callback.
> With that option set, any blocking operation on the broker will wait a mirror
> roundtrip:
> tx.commit(), session.send (non transactional). client.ack (when configured as
> sync).
> Notice that in AMQP client dispositions are always asynchronous, hence it's
> only possible to sync acks if using transactional for AMQP.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)