[
https://issues.apache.org/jira/browse/ARTEMIS-3243?focusedWorklogId=620151&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-620151
]
ASF GitHub Bot logged work on ARTEMIS-3243:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 07/Jul/21 18:51
Start Date: 07/Jul/21 18:51
Worklog Time Spent: 10m
Work Description: clebertsuconic commented on a change in pull request
#3633:
URL: https://github.com/apache/activemq-artemis/pull/3633#discussion_r665627054
##########
File path:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -38,51 +35,136 @@
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
+import
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import
org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.collections.IDSupplier;
+import org.apache.activemq.artemis.utils.pools.MpscPool;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.jboss.logging.Logger;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADD_ADDRESS;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_ADDRESS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.BROKER_ID;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.CREATE_QUEUE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_ADDRESS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS_SCAN_START;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS_SCAN_END;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver
implements MirrorController {
- public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY =
SimpleString.toSimpleString(INTERNAL_ID.toString());
-
private static final Logger logger =
Logger.getLogger(AMQPMirrorControllerTarget.class);
- final ActiveMQServer server;
+ private static ThreadLocal<MirrorController> controllerThreadLocal = new
ThreadLocal<>();
+
+ public static void setControllerTarget(MirrorController controller) {
+ controllerThreadLocal.set(controller);
+ }
+
+ public static MirrorController getControllerTarget() {
+ return controllerThreadLocal.get();
+ }
+
+ class ACKMessageOperation extends TransactionOperationAbstract implements
IOCallback, Runnable {
+
+ Delivery delivery;
+
+ void reset() {
+ this.delivery = null;
+ }
+
+ ACKMessageOperation setDelivery(Delivery delivery) {
+ this.delivery = delivery;
+ return this;
+ }
+
+ @Override
+ public void run() {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Delivery settling for " + delivery + ", context=" +
delivery.getContext());
+ }
+ delivery.disposition(Accepted.getInstance());
+ settle(delivery);
+ connection.flush();
+
AMQPMirrorControllerTarget.this.ackMessageMpscPool.release(ACKMessageOperation.this);
+ }
+
+ @Override
+ public void beforeCommit(Transaction tx) throws Exception {
+ }
+
+ @Override
+ public void afterCommit(Transaction tx) {
+ done();
+ }
+
+ @Override
+ public void done() {
+ connection.runNow(ACKMessageOperation.this);
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ logger.warn(errorMessage + "-" + errorMessage);
+ }
+ }
+
+ // in a regular case we should not have more than amqpCredits on the pool,
that's the max we would need
+ private final MpscPool<ACKMessageOperation> ackMessageMpscPool = new
MpscPool<>(amqpCredits, ACKMessageOperation::reset, () -> new
ACKMessageOperation());
final RoutingContextImpl routingContext = new RoutingContextImpl(null);
- Map<SimpleString, Map<SimpleString, QueueConfiguration>> scanAddresses;
+ final BasicMirrorController<Receiver> basicController;
+
+ final ActiveMQServer server;
+
+ final DuplicateIDCache duplicateIDCache;
+
+ private final IDSupplier<MessageReference> referenceIDSupplier;
public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
AMQPConnectionContext connection,
AMQPSessionContext protonSession,
Receiver receiver,
ActiveMQServer server) {
super(sessionSPI, connection, protonSession, receiver);
+ this.basicController = new BasicMirrorController(server);
+ this.basicController.setLink(receiver);
Review comment:
@gemmellr yes.. I have seen this comment already... I'm still thinking
on how I would / should address this.... leave it with me for a day or so
before I can comment back on this one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 620151)
Time Spent: 19h 10m (was: 19h)
> Enhance AMQP Mirror support with dual mirror
> --------------------------------------------
>
> Key: ARTEMIS-3243
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3243
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Affects Versions: 2.17.0
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Fix For: 2.18.0
>
> Time Spent: 19h 10m
> Remaining Estimate: 0h
>
> at the current Mirror version, we can only mirror into a single direction.
> With this enhancement the two (or more brokers) would be connected to each
> other, each one having its own ID, and each one would send updates to the
> other broker.
> The outcome is that if you just transferred producers and consumers from one
> broker into the other, the fallback would be automatic and simple. No need to
> disable and enable mirror options.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)