[ 
https://issues.apache.org/jira/browse/ARTEMIS-3243?focusedWorklogId=620148&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-620148
 ]

ASF GitHub Bot logged work on ARTEMIS-3243:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Jul/21 18:48
            Start Date: 07/Jul/21 18:48
    Worklog Time Spent: 10m 
      Work Description: clebertsuconic commented on a change in pull request 
#3633:
URL: https://github.com/apache/activemq-artemis/pull/3633#discussion_r665625085



##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -38,51 +35,146 @@
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.server.mirror.MirrorController;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import 
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import 
org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.collections.NodeStore;
+import org.apache.activemq.artemis.utils.pools.MpscPool;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.jboss.logging.Logger;
 
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADD_ADDRESS;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_ADDRESS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.BROKER_ID;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.CREATE_QUEUE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_ADDRESS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS_SCAN_START;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS_SCAN_END;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
 
 public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver 
implements MirrorController {
 
-   public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = 
SimpleString.toSimpleString(INTERNAL_ID.toString());
-
    private static final Logger logger = 
Logger.getLogger(AMQPMirrorControllerTarget.class);
 
-   final ActiveMQServer server;
+   private static ThreadLocal<MirrorController> controllerThreadLocal = new 
ThreadLocal<>();
+
+   public static void setControllerTarget(MirrorController controller) {
+      controllerThreadLocal.set(controller);
+   }
+
+   public static MirrorController getControllerTarget() {
+      return controllerThreadLocal.get();
+   }
+
+   /** Objects of this class can be used by either transaction or by 
OperationContext.
+    *  It is important that when you're using the transactions you clear any 
references to
+    *  the operation context. Don't use transaction and OperationContext at 
the same time
+    *  as that would generate duplicates on the objects cache.
+    */
+   class ACKMessageOperation implements IOCallback, Runnable {
+
+      Delivery delivery;
+
+      /** notice that when you use the Transaction, you need to make sure you 
don't use the IO*/
+      public TransactionOperationAbstract tx = new 
TransactionOperationAbstract() {
+         @Override
+         public void afterCommit(Transaction tx) {
+            completeOperation();
+         }
+      };
+
+      void reset() {
+         this.delivery = null;
+      }
+
+      ACKMessageOperation setDelivery(Delivery delivery) {
+         this.delivery = delivery;
+         return this;
+      }
+
+      @Override
+      public void run() {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Delivery settling for " + delivery + ", context=" + 
delivery.getContext());
+         }
+         delivery.disposition(Accepted.getInstance());
+         settle(delivery);
+         connection.flush();
+         
AMQPMirrorControllerTarget.this.ackMessageMpscPool.release(ACKMessageOperation.this);
+      }
+
+      @Override
+      public void done() {
+         completeOperation();
+      }
+
+      private void completeOperation() {
+         connection.runNow(ACKMessageOperation.this);
+      }
+
+      @Override
+      public void onError(int errorCode, String errorMessage) {
+         logger.warn(errorMessage + "-"  + errorMessage);
+      }
+   }
+
+   // in a regular case we should not have more than amqpCredits on the pool, 
that's the max we would need
+   private final MpscPool<ACKMessageOperation> ackMessageMpscPool = new 
MpscPool<>(amqpCredits, ACKMessageOperation::reset, () -> new 
ACKMessageOperation());
 
    final RoutingContextImpl routingContext = new RoutingContextImpl(null);
 
-   Map<SimpleString, Map<SimpleString, QueueConfiguration>> scanAddresses;
+   final BasicMirrorController<Receiver> basicController;
+
+   final DuplicateIDGenerator duplicateIDGenerator = new 
DuplicateIDGenerator();
+
+   final ActiveMQServer server;
+
+   final DuplicateIDCache duplicateIDCache;
+
+   private final NodeStore<MessageReference> referenceNodeStore;
 
    public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
                                      AMQPConnectionContext connection,
                                      AMQPSessionContext protonSession,
                                      Receiver receiver,
                                      ActiveMQServer server) {
       super(sessionSPI, connection, protonSession, receiver);
+      this.basicController = new BasicMirrorController(server);
+      this.basicController.setLink(receiver);
       this.server = server;
+      this.referenceNodeStore = 
sessionSPI.getProtocolManager().getReferenceIDSupplier();
+
+      // we use the number of credits for the duplicate detection, as that 
means the maximum number of elements you can have pending
+      if (logger.isTraceEnabled()) {
+         logger.trace("Setting up duplicate detection cache on " + 
ProtonProtocolManager.MIRROR_ADDRESS + " wth " + connection.getAmqpCredits() + 
" elements, being the number of credits");
+      }
+      duplicateIDCache = 
server.getPostOffice().getDuplicateIDCache(SimpleString.toSimpleString(ProtonProtocolManager.MIRROR_ADDRESS),
 connection.getAmqpCredits());

Review comment:
       @gemmellr you don't need more than "Credits" per broker ID connected to 
a broker...
   
   I have been testing this extensively and a test responsible to reconnect and 
kill a target never loses a message. I have been testing this like crazy...
   
   This is about in flight messages, and you cannot have more than "credits" 
messages pending.
   
   Having a duplicate cache of the same size as credits should be more than 
enough. I could play safe and do 2X credits though... I have though about 
that.. .but the tests I have been using never go to a point where I needed it.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 620148)
    Time Spent: 18h 50m  (was: 18h 40m)

> Enhance AMQP Mirror support with dual mirror
> --------------------------------------------
>
>                 Key: ARTEMIS-3243
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3243
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.17.0
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.18.0
>
>          Time Spent: 18h 50m
>  Remaining Estimate: 0h
>
> at the current Mirror version, we can only mirror into a single direction.
> With this enhancement the two (or more brokers) would be connected to each 
> other, each one having its own ID, and each one would send updates to the 
> other broker.
> The outcome is that if you just transferred producers and consumers from one 
> broker into the other, the fallback would be automatic and simple. No need to 
> disable and enable mirror options.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to