[ 
https://issues.apache.org/jira/browse/ARTEMIS-3243?focusedWorklogId=601274&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-601274
 ]

ASF GitHub Bot logged work on ARTEMIS-3243:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/May/21 16:45
            Start Date: 24/May/21 16:45
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on a change in pull request #3545:
URL: https://github.com/apache/activemq-artemis/pull/3545#discussion_r626589356



##########
File path: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
##########
@@ -233,6 +233,21 @@ public static void longToBytes(long x, byte[] output, int 
offset) {
       output[offset + 7] = (byte)(x);
    }
 
+   /** the byte ID goes to the first byte, everything else stays where they 
are. */
+   public static long mixByteAndLong(byte id, long longID) {

Review comment:
       Should it verify it isnt dropping/overwriting data within the original 
long while doing this, or is that the callers responsibility?

##########
File path: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java
##########
@@ -41,6 +41,11 @@
     */
    void setIDSupplier(ToLongFunction<T> supplier);
 
+   /**
+    * Use non zeroed IDs. 0 is reserved for null values and it should return 
always null.

Review comment:
       LinkedList#setIDSupplier(ToLongFunction) needs updated too, it says 
these values must be positive.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
##########
@@ -150,6 +172,17 @@ public void deleteQueue(SimpleString address, SimpleString 
queue) throws Excepti
 
    @Override
    public void sendMessage(Message message, RoutingContext context, 
List<MessageReference> refs) {
+      if (context.getMirrorSource() != null &&
+         context.getMirrorSource().getRemoteMirrorId() == 
this.getRemoteMirrorId()) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("server " + server + " is Rejecting ping pong on send 
" + message);

Review comment:
       'Rejecting ping pong on send' could be easy to misinterpret, 
particularly given the AMQP context.
   
   Perhaps something like 'Discarding send due to avoid looping, matched remote 
mirror-id <id> for message <msg>'?

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
##########
@@ -202,8 +235,34 @@ private static Properties getProperties(Message message) {
 
    @Override
    public void postAcknowledge(MessageReference ref, AckReason reason) throws 
Exception {
+
+      MirrorController targetController = getControllerTarget();
+
+      if (targetController != null || ref.getQueue() != null && 
(ref.getQueue().isInternalQueue() || ref.getQueue().isMirrorController())) {
+         logger.trace(server + " rejecting ping pong for postAcknowledge 
queue=" + ref.getQueue().getName() +

Review comment:
       Missing if (logger.isTraceEnabled()) {
   
   Similar comment about 'rejecting ping pong' as before.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
##########
@@ -322,17 +323,26 @@ public String toString() {
       // toString will only call ensureScanning on regular messages
       // as large messages might need to do extra work to parse it
       ensureScanning();
-      return super.toString();
+      String body = getStringBody();
+      return super.toString() + (body != null ? ",body=[" + body + "]" : "");
    }
 
    @Override
    public String getStringBody() {
       final Section body = getBody();
-      if (body instanceof AmqpValue && ((AmqpValue) body).getValue() 
instanceof String) {
-         return (String) ((AmqpValue) body).getValue();
-      } else {
-         return null;
+      if (body instanceof AmqpValue) {
+         AmqpValue sectionBody = (AmqpValue) body;
+         if (sectionBody != null) {
+            if (sectionBody.getValue() instanceof Long) {
+               long id = (long)sectionBody.getValue();
+               return sectionBody.getValue().toString() + ", byte[0]=" + 
ByteUtil.getFirstByte(id) + ", messageID=" + ByteUtil.removeFirstByte(id);
+            } else {
+               return sectionBody.getValue().toString();
+            }

Review comment:
       Related to above, is this change just for debugging or an intended 
permanent change?
   
   There is nothing to say a message with a long in the body is one of these 
new 'munged mirroring IDs' so it seems inappropriate to typically describe them 
as such, thats just going to confuse.
   
   With this change this 'getStringBody' method also goes from only returning a 
value if there was a string body value present, to returning the toString of 
any body type which would seem quite a drastic change.
   
   The else will also NPE if it the body section contains a null value, as some 
will do.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
##########
@@ -490,12 +495,18 @@ private void connectSender(Queue queue,
             Source source = new Source();
             source.setAddress(queue.getAddress().toString());
             sender.setSource(source);
+            HashMap<Symbol, Object> mapProperties = new HashMap<>(1);

Review comment:
       Setting the initial size to 1 like this will mean it has to resize as 
soon as you use it. Use e.g 2 or perhaps play with the load factor (which 
defaults to 0.75) if trying to avoid allocating the standard size, and avoid a 
resize on use.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -38,51 +36,148 @@
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.server.mirror.MirrorController;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import 
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import 
org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.StringPrintStream;
+import org.apache.activemq.artemis.utils.pools.MpscPool;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.jboss.logging.Logger;
 
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADD_ADDRESS;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_ADDRESS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.CREATE_QUEUE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_ADDRESS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS_SCAN_START;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS_SCAN_END;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
 
 public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver 
implements MirrorController {
 
-   public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = 
SimpleString.toSimpleString(INTERNAL_ID.toString());
-
    private static final Logger logger = 
Logger.getLogger(AMQPMirrorControllerTarget.class);
 
-   final ActiveMQServer server;
+   private static ThreadLocal<MirrorController> controllerThreadLocal = new 
ThreadLocal<>();
+
+   public static void setControllerTarget(MirrorController controller) {
+      controllerThreadLocal.set(controller);
+   }
+
+   public static MirrorController getControllerTarget() {
+      return controllerThreadLocal.get();
+   }
+
+   class ACKMessage extends TransactionOperationAbstract implements 
IOCallback, Runnable {
+
+      Delivery delivery;
+
+      void reset() {
+         this.delivery = null;
+      }
+
+      ACKMessage setDelivery(Delivery delivery) {
+         this.delivery = delivery;
+         return this;
+      }
+
+      @Override
+      public void run() {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Delivery settling for " + delivery + ", context=" + 
delivery.getContext());
+         }
+         delivery.disposition(Accepted.getInstance());
+         settle(delivery);
+         connection.flush();
+         
AMQPMirrorControllerTarget.this.ackMessageMpscPool.release(ACKMessage.this);
+      }
+
+      @Override
+      public void beforeCommit(Transaction tx) throws Exception {
+      }
+
+      @Override
+      public void afterCommit(Transaction tx) {
+         done();
+      }
+
+      @Override
+      public void done() {
+         connection.runNow(this);
+      }
+
+      @Override
+      public void onError(int errorCode, String errorMessage) {
+      }
+   }
+
+   // in a regular case we should not have more than amqpCredits on the pool, 
that's the max we would need
+   private final MpscPool<ACKMessage> ackMessageMpscPool = new 
MpscPool<>(amqpCredits, ACKMessage::reset,  () -> new ACKMessage());
 
    final RoutingContextImpl routingContext = new RoutingContextImpl(null);
 
-   Map<SimpleString, Map<SimpleString, QueueConfiguration>> scanAddresses;
+   final BasicMirrorController<Receiver> basicController;
+
+   final ActiveMQServer server;
+
+   final DuplicateIDCache duplicateIDCache;
+
+   private final ToLongFunction<MessageReference> referenceIDSupplier;
 
    public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
                                      AMQPConnectionContext connection,
                                      AMQPSessionContext protonSession,
                                      Receiver receiver,
                                      ActiveMQServer server) {
       super(sessionSPI, connection, protonSession, receiver);
+      this.basicController = new BasicMirrorController(server);
+      this.basicController.setLink(receiver);
       this.server = server;
+
+      // we ise the number of credits for the duplicate detection, as that 
means the maximum number of elements you can have pending
+      if (logger.isTraceEnabled()) {
+         logger.trace("Setting up Duplicate detection on " + 
ProtonProtocolManager.MIRROR_ADDRESS + " wth " + connection.getAmqpCredits() + 
" as that's the number of credits");

Review comment:
       with?

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
##########
@@ -324,13 +326,13 @@ private static void 
uninstallMirrorController(AMQPMirrorBrokerConnectionElement
     *  It is returning the snfQueue to the replica, and I needed isolation 
from the actual instance.
     *  During development I had a mistake where I used a property from the 
Object,
     *  so, I needed this isolation for my organization and making sure nothing 
would be shared. */
-   private static Queue installMirrorController(AMQPBrokerConnection 
brokerConnection, AMQPMirrorBrokerConnectionElement replicaConfig, 
ActiveMQServer server) throws Exception {
+   private Queue installMirrorController(AMQPMirrorBrokerConnectionElement 
replicaConfig, ActiveMQServer server) throws Exception {

Review comment:
       The comment for the method seems to be describing why this method is 
static, which it isnt after this change, so the comment needs removed or 
updated.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
##########
@@ -322,17 +323,26 @@ public String toString() {
       // toString will only call ensureScanning on regular messages
       // as large messages might need to do extra work to parse it
       ensureScanning();
-      return super.toString();
+      String body = getStringBody();
+      return super.toString() + (body != null ? ",body=[" + body + "]" : "");

Review comment:
       Returning potentially the entire the body in the toString seems 
inadvisable. Is this just for debug?

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
##########
@@ -170,16 +203,16 @@ public void sendMessage(Message message, RoutingContext 
context, List<MessageRef
       }
    }
 
-   public static void validateProtocolData(MessageReference ref, SimpleString 
snfAddress) {
+   public static void validateProtocolData(ActiveMQServer server, 
MessageReference ref, SimpleString snfAddress) {
       if (ref.getProtocolData() == null && 
!ref.getMessage().getAddressSimpleString().equals(snfAddress)) {
-         setProtocolData(ref);
+         setProtocolData(server, ref);
       }
    }
 
-   private static void setProtocolData(MessageReference ref) {
+   private static void setProtocolData(ActiveMQServer server, MessageReference 
ref) {
       Map<Symbol, Object> daMap = new HashMap<>();
       DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap);
-      daMap.put(INTERNAL_ID, ref.getMessage().getMessageID());
+      daMap.put(INTERNAL_ID, 
ByteUtil.mixByteAndLong((byte)server.getMirrorBrokerId(), 
ref.getMessage().getMessageID()));

Review comment:
       Just using separate annotations seems like it would be simpler and less 
error prone. No validation that this mixing doesnt break the id value.
   
   There are lots of places converting this back or forth, some places even 
convert it one way then back the other for logging. Using a specific broker id 
annotation seems like it would be simpler.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
##########
@@ -490,12 +495,18 @@ private void connectSender(Queue queue,
             Source source = new Source();
             source.setAddress(queue.getAddress().toString());
             sender.setSource(source);
+            HashMap<Symbol, Object> mapProperties = new HashMap<>(1);
+            mapProperties.put(AMQPMirrorControllerSource.BROKER_ID, 
server.getMirrorBrokerId());

Review comment:
       Would restricting it to 127 rather than 255 allowed values and just 
using a byte be simpler than using a short as it does here and then needing to 
convert it where it is actually used later, which is primarily as a byte it 
seems?
   
   (or, since there are only 255 values being allowed, could still just use a 
byte and use negatives for the other IDs whenever they are really needed, which 
doesn't seem likely to be all that often)

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
##########
@@ -202,8 +235,34 @@ private static Properties getProperties(Message message) {
 
    @Override
    public void postAcknowledge(MessageReference ref, AckReason reason) throws 
Exception {
+
+      MirrorController targetController = getControllerTarget();
+
+      if (targetController != null || ref.getQueue() != null && 
(ref.getQueue().isInternalQueue() || ref.getQueue().isMirrorController())) {
+         logger.trace(server + " rejecting ping pong for postAcknowledge 
queue=" + ref.getQueue().getName() +
+                         ", ref=" + ref);
+         return;
+      }
+
+      if (logger.isTraceEnabled()) {
+         logger.trace(server + " postAcknowledge " + ref);
+      }
+
       if (acks && !ref.getQueue().isMirrorController()) { // we don't call 
postACK on snfqueues, otherwise we would get infinite loop because of this 
feedback
-         Message message = createMessage(ref.getQueue().getAddress(), 
ref.getQueue().getName(), POST_ACK, ref.getMessage().getMessageID());
+         Long internalIDObject = 
(Long)ref.getMessage().getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
+         long internalID;
+         if (internalIDObject == null) {
+            internalID = ByteUtil.mixByteAndLong((byte)localMirrorId, 
ref.getMessageID());
+         } else {
+            internalID = internalIDObject.longValue();
+            if (logger.isTraceEnabled()) {
+               logger.trace("server " + server + "postAcknowledge on " + 
server + " acking message " + ref);

Review comment:
       "server" is referenced twice in the log message.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -38,51 +36,148 @@
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.server.mirror.MirrorController;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import 
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import 
org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.StringPrintStream;
+import org.apache.activemq.artemis.utils.pools.MpscPool;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.jboss.logging.Logger;
 
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADD_ADDRESS;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_ADDRESS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.CREATE_QUEUE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_ADDRESS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS_SCAN_START;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS_SCAN_END;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
 
 public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver 
implements MirrorController {
 
-   public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = 
SimpleString.toSimpleString(INTERNAL_ID.toString());
-
    private static final Logger logger = 
Logger.getLogger(AMQPMirrorControllerTarget.class);
 
-   final ActiveMQServer server;
+   private static ThreadLocal<MirrorController> controllerThreadLocal = new 
ThreadLocal<>();
+
+   public static void setControllerTarget(MirrorController controller) {
+      controllerThreadLocal.set(controller);
+   }
+
+   public static MirrorController getControllerTarget() {
+      return controllerThreadLocal.get();
+   }
+
+   class ACKMessage extends TransactionOperationAbstract implements 
IOCallback, Runnable {
+
+      Delivery delivery;
+
+      void reset() {
+         this.delivery = null;
+      }
+
+      ACKMessage setDelivery(Delivery delivery) {
+         this.delivery = delivery;
+         return this;
+      }
+
+      @Override
+      public void run() {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Delivery settling for " + delivery + ", context=" + 
delivery.getContext());
+         }
+         delivery.disposition(Accepted.getInstance());
+         settle(delivery);
+         connection.flush();
+         
AMQPMirrorControllerTarget.this.ackMessageMpscPool.release(ACKMessage.this);
+      }
+
+      @Override
+      public void beforeCommit(Transaction tx) throws Exception {
+      }
+
+      @Override
+      public void afterCommit(Transaction tx) {
+         done();
+      }
+
+      @Override
+      public void done() {
+         connection.runNow(this);
+      }
+
+      @Override
+      public void onError(int errorCode, String errorMessage) {
+      }
+   }
+
+   // in a regular case we should not have more than amqpCredits on the pool, 
that's the max we would need
+   private final MpscPool<ACKMessage> ackMessageMpscPool = new 
MpscPool<>(amqpCredits, ACKMessage::reset,  () -> new ACKMessage());
 
    final RoutingContextImpl routingContext = new RoutingContextImpl(null);
 
-   Map<SimpleString, Map<SimpleString, QueueConfiguration>> scanAddresses;
+   final BasicMirrorController<Receiver> basicController;
+
+   final ActiveMQServer server;
+
+   final DuplicateIDCache duplicateIDCache;
+
+   private final ToLongFunction<MessageReference> referenceIDSupplier;
 
    public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
                                      AMQPConnectionContext connection,
                                      AMQPSessionContext protonSession,
                                      Receiver receiver,
                                      ActiveMQServer server) {
       super(sessionSPI, connection, protonSession, receiver);
+      this.basicController = new BasicMirrorController(server);
+      this.basicController.setLink(receiver);
       this.server = server;
+
+      // we ise the number of credits for the duplicate detection, as that 
means the maximum number of elements you can have pending

Review comment:
       use?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 601274)
    Time Spent: 1h  (was: 50m)

> Enhance AMQP Mirror support with dual mirror
> --------------------------------------------
>
>                 Key: ARTEMIS-3243
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3243
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.17.0
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.18.0
>
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> at the current Mirror version, we can only mirror into a single direction.
> With this enhancement the two (or more brokers) would be connected to each 
> other, each one having its own ID, and each one would send updates to the 
> other broker.
> The outcome is that if you just transferred producers and consumers from one 
> broker into the other, the fallback would be automatic and simple. No need to 
> disable and enable mirror options.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to