[
https://issues.apache.org/jira/browse/ARTEMIS-3243?focusedWorklogId=620052&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-620052
]
ASF GitHub Bot logged work on ARTEMIS-3243:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 07/Jul/21 16:04
Start Date: 07/Jul/21 16:04
Worklog Time Spent: 10m
Work Description: gemmellr commented on a change in pull request #3633:
URL: https://github.com/apache/activemq-artemis/pull/3633#discussion_r665503640
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -38,51 +35,146 @@
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
+import
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import
org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.collections.NodeStore;
+import org.apache.activemq.artemis.utils.pools.MpscPool;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.jboss.logging.Logger;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADD_ADDRESS;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_ADDRESS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.BROKER_ID;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.CREATE_QUEUE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_ADDRESS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS_SCAN_START;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS_SCAN_END;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver
implements MirrorController {
- public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY =
SimpleString.toSimpleString(INTERNAL_ID.toString());
-
private static final Logger logger =
Logger.getLogger(AMQPMirrorControllerTarget.class);
- final ActiveMQServer server;
+ private static ThreadLocal<MirrorController> controllerThreadLocal = new
ThreadLocal<>();
+
+ public static void setControllerTarget(MirrorController controller) {
+ controllerThreadLocal.set(controller);
+ }
+
+ public static MirrorController getControllerTarget() {
+ return controllerThreadLocal.get();
+ }
+
+ /** Objects of this class can be used by either transaction or by
OperationContext.
+ * It is important that when you're using the transactions you clear any
references to
+ * the operation context. Don't use transaction and OperationContext at
the same time
+ * as that would generate duplicates on the objects cache.
+ */
+ class ACKMessageOperation implements IOCallback, Runnable {
+
+ Delivery delivery;
+
+ /** notice that when you use the Transaction, you need to make sure you
don't use the IO*/
+ public TransactionOperationAbstract tx = new
TransactionOperationAbstract() {
+ @Override
+ public void afterCommit(Transaction tx) {
+ completeOperation();
+ }
+ };
+
+ void reset() {
+ this.delivery = null;
+ }
+
+ ACKMessageOperation setDelivery(Delivery delivery) {
+ this.delivery = delivery;
+ return this;
+ }
+
+ @Override
+ public void run() {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Delivery settling for " + delivery + ", context=" +
delivery.getContext());
+ }
+ delivery.disposition(Accepted.getInstance());
+ settle(delivery);
+ connection.flush();
+
AMQPMirrorControllerTarget.this.ackMessageMpscPool.release(ACKMessageOperation.this);
+ }
+
+ @Override
+ public void done() {
+ completeOperation();
+ }
+
+ private void completeOperation() {
+ connection.runNow(ACKMessageOperation.this);
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ logger.warn(errorMessage + "-" + errorMessage);
+ }
+ }
+
+ // in a regular case we should not have more than amqpCredits on the pool,
that's the max we would need
+ private final MpscPool<ACKMessageOperation> ackMessageMpscPool = new
MpscPool<>(amqpCredits, ACKMessageOperation::reset, () -> new
ACKMessageOperation());
final RoutingContextImpl routingContext = new RoutingContextImpl(null);
- Map<SimpleString, Map<SimpleString, QueueConfiguration>> scanAddresses;
+ final BasicMirrorController<Receiver> basicController;
+
+ final DuplicateIDGenerator duplicateIDGenerator = new
DuplicateIDGenerator();
+
+ final ActiveMQServer server;
+
+ final DuplicateIDCache duplicateIDCache;
+
+ private final NodeStore<MessageReference> referenceNodeStore;
public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
AMQPConnectionContext connection,
AMQPSessionContext protonSession,
Receiver receiver,
ActiveMQServer server) {
super(sessionSPI, connection, protonSession, receiver);
+ this.basicController = new BasicMirrorController(server);
+ this.basicController.setLink(receiver);
this.server = server;
+ this.referenceNodeStore =
sessionSPI.getProtocolManager().getReferenceIDSupplier();
+
+ // we use the number of credits for the duplicate detection, as that
means the maximum number of elements you can have pending
+ if (logger.isTraceEnabled()) {
+ logger.trace("Setting up duplicate detection cache on " +
ProtonProtocolManager.MIRROR_ADDRESS + " wth " + connection.getAmqpCredits() +
" elements, being the number of credits");
+ }
+ duplicateIDCache =
server.getPostOffice().getDuplicateIDCache(SimpleString.toSimpleString(ProtonProtocolManager.MIRROR_ADDRESS),
connection.getAmqpCredits());
Review comment:
Is this cache a shared instance used across all 'controller target'
links? Seems like it is. If so might it be better to have more than 'credits'
entries here since the more mirror links there are, the more messages possible
and the less effective this will become, being divided between them all.
Seems from the docs that the default 'id-cache-size' is 20000
normally...whilst this will end up with 1000 in comparison and only be
increasable by making the disparity worse by giving every link yet more credit.
Might be better to unlink it from 'credit' entirely.
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -267,102 +322,151 @@ public void deleteAddress(AddressInfo addressInfo)
throws Exception {
@Override
public void createQueue(QueueConfiguration queueConfiguration) throws
Exception {
if (logger.isDebugEnabled()) {
- logger.debug("Adding queue " + queueConfiguration);
+ logger.debug(server + " Adding queue " + queueConfiguration);
}
server.createQueue(queueConfiguration, true);
-
- if (scanAddresses != null) {
-
getQueueScanMap(queueConfiguration.getAddress()).put(queueConfiguration.getName(),
queueConfiguration);
- }
}
@Override
public void deleteQueue(SimpleString addressName, SimpleString queueName)
throws Exception {
if (logger.isDebugEnabled()) {
- logger.debug("destroy queue " + queueName + " on address = " +
addressName);
+ logger.debug(server + " destroy queue " + queueName + " on address =
" + addressName + " server " + server.getIdentity());
}
try {
- server.destroyQueue(queueName);
+ server.destroyQueue(queueName,null, false, true, false, false);
} catch (ActiveMQNonExistentQueueException expected) {
- logger.debug("queue " + queueName + " was previously removed",
expected);
+ logger.debug(server + " queue " + queueName + " was previously
removed", expected);
}
}
- private static ToLongFunction<MessageReference> referenceIDSupplier =
(source) -> {
- Long id = (Long)
source.getMessage().getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
- if (id == null) {
- return -1;
- } else {
- return id;
+ public boolean postAcknowledge(String address, String queue, String nodeID,
long messageID, ACKMessageOperation ackMessage) throws Exception {
+ final Queue targetQueue = server.locateQueue(queue);
+
+ if (targetQueue == null) {
+ logger.warn("Queue " + queue + " not found on mirror target, ignoring
ack for queue=" + queue + ", messageID=" + messageID + ", nodeID=" + nodeID);
+ return false;
}
- };
- public void postAcknowledge(String address, String queue, long messageID) {
if (logger.isDebugEnabled()) {
- logger.debug("post acking " + address + ", queue = " + queue + ",
messageID = " + messageID);
+ // we only do the following check if debug
+ if (targetQueue.getConsumerCount() > 0) {
+ logger.debug("server " + server.getIdentity() + ", queue " +
targetQueue.getName() + " has consumers while delivering ack for " + messageID);
+ }
}
- Queue targetQueue = server.locateQueue(queue);
- if (targetQueue != null) {
- MessageReference reference =
targetQueue.removeWithSuppliedID(messageID, referenceIDSupplier);
- if (reference != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("Acking reference " + reference);
- }
- try {
- targetQueue.acknowledge(reference);
- } catch (Exception e) {
- // TODO anything else I can do here?
- // such as close the connection with error?
- logger.warn(e.getMessage(), e);
- }
- } else {
- if (logger.isTraceEnabled()) {
- logger.trace("There is no reference to ack on " + messageID);
- }
+ if (logger.isTraceEnabled()) {
+ logger.trace("Server " + server.getIdentity() + " with queue = " +
queue + " being acked for " + messageID + " coming from " + messageID + "
targetQueue = " + targetQueue);
+ }
+
+ performAck(nodeID, messageID, targetQueue, ackMessage, true);
+ return true;
+
+ }
+
+ private void performAck(String nodeID, long messageID, Queue targetQueue,
ACKMessageOperation ackMessageOperation, boolean retry) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("performAck (nodeID=" + nodeID + ", messageID=" +
messageID + ")" + ", targetQueue=" + targetQueue.getName());
+ }
+ MessageReference reference = targetQueue.removeWithSuppliedID(nodeID,
messageID, referenceNodeStore);
+ if (reference == null && retry) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Retrying Reference not found on messageID=" +
messageID + " nodeID=" + nodeID);
+ }
+ targetQueue.flushOnIntermediate(() -> {
+ recoverContext();
+ performAck(nodeID, messageID, targetQueue, ackMessageOperation,
false);
+ });
+ return;
+ }
+ if (reference != null) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Post ack Server " + server + " worked well for
messageID=" + messageID + " nodeID=" + nodeID);
+ }
+ try {
+ targetQueue.acknowledge(reference);
+
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Post ack Server " + server + " could not find
messageID = " + messageID +
+ " representing nodeID=" + nodeID);
}
}
}
- private void sendMessage(AMQPMessage message) throws Exception {
+ /**
+ * this method returning true means the sendMessage was successful, and the
IOContext should no longer be used.
+ * as the sendMessage was successful the OperationContext of the
transaction will take care of the completion.
+ * The caller of this method should give up any reference to
messageCompletionAck when this method returns true.
+ * */
+ private boolean sendMessage(AMQPMessage message, ACKMessageOperation
messageCompletionAck) throws Exception {
+
if (message.getMessageID() <= 0) {
message.setMessageID(server.getStorageManager().generateID());
}
- Long internalID = (Long)
AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, INTERNAL_ID);
+ String internalMirrorID =
(String)AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message,
BROKER_ID);
+ if (internalMirrorID == null) {
+ internalMirrorID = getRemoteMirrorId(); // not pasisng the ID means
the data was generated on the remote broker
+ }
+ Long internalIDLong = (Long)
AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, INTERNAL_ID);
String internalAddress = (String)
AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message,
INTERNAL_DESTINATION);
- if (internalID != null) {
- message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID);
+ long internalID = 0;
+
+ if (internalIDLong != null) {
+ internalID = internalIDLong;
+ }
+
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("sendMessage on server " + server + " for message " +
message +
+ " with internalID = " + internalIDLong + " mirror id
" + internalMirrorID);
+ }
+
+
+ routingContext.setDuplicateDetection(false); // we do our own duplicate
detection here
+
+ byte[] duplicateIDBytes =
duplicateIDGenerator.generateDuplicateID(internalMirrorID, internalID);
+
+ if (duplicateIDCache.contains(duplicateIDBytes)) {
Review comment:
Do we need to consider the 'internalAddress' value as well if it exists?
(Not clear when it does exactly)
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -38,51 +35,146 @@
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
+import
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import
org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.collections.NodeStore;
+import org.apache.activemq.artemis.utils.pools.MpscPool;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.jboss.logging.Logger;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADD_ADDRESS;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_ADDRESS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.BROKER_ID;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.CREATE_QUEUE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_ADDRESS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS_SCAN_START;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS_SCAN_END;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver
implements MirrorController {
- public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY =
SimpleString.toSimpleString(INTERNAL_ID.toString());
-
private static final Logger logger =
Logger.getLogger(AMQPMirrorControllerTarget.class);
- final ActiveMQServer server;
+ private static ThreadLocal<MirrorController> controllerThreadLocal = new
ThreadLocal<>();
+
+ public static void setControllerTarget(MirrorController controller) {
+ controllerThreadLocal.set(controller);
+ }
+
+ public static MirrorController getControllerTarget() {
+ return controllerThreadLocal.get();
+ }
+
+ /** Objects of this class can be used by either transaction or by
OperationContext.
+ * It is important that when you're using the transactions you clear any
references to
+ * the operation context. Don't use transaction and OperationContext at
the same time
+ * as that would generate duplicates on the objects cache.
+ */
+ class ACKMessageOperation implements IOCallback, Runnable {
+
+ Delivery delivery;
+
+ /** notice that when you use the Transaction, you need to make sure you
don't use the IO*/
+ public TransactionOperationAbstract tx = new
TransactionOperationAbstract() {
+ @Override
+ public void afterCommit(Transaction tx) {
+ completeOperation();
+ }
+ };
+
+ void reset() {
+ this.delivery = null;
+ }
+
+ ACKMessageOperation setDelivery(Delivery delivery) {
+ this.delivery = delivery;
+ return this;
+ }
+
+ @Override
+ public void run() {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Delivery settling for " + delivery + ", context=" +
delivery.getContext());
+ }
+ delivery.disposition(Accepted.getInstance());
+ settle(delivery);
+ connection.flush();
+
AMQPMirrorControllerTarget.this.ackMessageMpscPool.release(ACKMessageOperation.this);
+ }
+
+ @Override
+ public void done() {
+ completeOperation();
+ }
+
+ private void completeOperation() {
+ connection.runNow(ACKMessageOperation.this);
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ logger.warn(errorMessage + "-" + errorMessage);
+ }
+ }
+
+ // in a regular case we should not have more than amqpCredits on the pool,
that's the max we would need
+ private final MpscPool<ACKMessageOperation> ackMessageMpscPool = new
MpscPool<>(amqpCredits, ACKMessageOperation::reset, () -> new
ACKMessageOperation());
final RoutingContextImpl routingContext = new RoutingContextImpl(null);
- Map<SimpleString, Map<SimpleString, QueueConfiguration>> scanAddresses;
+ final BasicMirrorController<Receiver> basicController;
+
+ final DuplicateIDGenerator duplicateIDGenerator = new
DuplicateIDGenerator();
+
+ final ActiveMQServer server;
+
+ final DuplicateIDCache duplicateIDCache;
+
+ private final NodeStore<MessageReference> referenceNodeStore;
public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
AMQPConnectionContext connection,
AMQPSessionContext protonSession,
Receiver receiver,
ActiveMQServer server) {
super(sessionSPI, connection, protonSession, receiver);
+ this.basicController = new BasicMirrorController(server);
+ this.basicController.setLink(receiver);
this.server = server;
+ this.referenceNodeStore =
sessionSPI.getProtocolManager().getReferenceIDSupplier();
+
+ // we use the number of credits for the duplicate detection, as that
means the maximum number of elements you can have pending
+ if (logger.isTraceEnabled()) {
+ logger.trace("Setting up duplicate detection cache on " +
ProtonProtocolManager.MIRROR_ADDRESS + " wth " + connection.getAmqpCredits() +
" elements, being the number of credits");
Review comment:
" wth " -> " w**i**th "
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 620052)
Time Spent: 18h (was: 17h 50m)
> Enhance AMQP Mirror support with dual mirror
> --------------------------------------------
>
> Key: ARTEMIS-3243
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3243
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Affects Versions: 2.17.0
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Fix For: 2.18.0
>
> Time Spent: 18h
> Remaining Estimate: 0h
>
> at the current Mirror version, we can only mirror into a single direction.
> With this enhancement the two (or more brokers) would be connected to each
> other, each one having its own ID, and each one would send updates to the
> other broker.
> The outcome is that if you just transferred producers and consumers from one
> broker into the other, the fallback would be automatic and simple. No need to
> disable and enable mirror options.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)