[ 
https://issues.apache.org/jira/browse/ARTEMIS-3243?focusedWorklogId=620023&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-620023
 ]

ASF GitHub Bot logged work on ARTEMIS-3243:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Jul/21 15:10
            Start Date: 07/Jul/21 15:10
    Worklog Time Spent: 10m 
      Work Description: clebertsuconic commented on a change in pull request 
#3633:
URL: https://github.com/apache/activemq-artemis/pull/3633#discussion_r665465536



##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -267,102 +310,143 @@ public void deleteAddress(AddressInfo addressInfo) 
throws Exception {
    @Override
    public void createQueue(QueueConfiguration queueConfiguration) throws 
Exception {
       if (logger.isDebugEnabled()) {
-         logger.debug("Adding queue " + queueConfiguration);
+         logger.debug(server + " Adding queue " + queueConfiguration);
       }
       server.createQueue(queueConfiguration, true);
-
-      if (scanAddresses != null) {
-         
getQueueScanMap(queueConfiguration.getAddress()).put(queueConfiguration.getName(),
 queueConfiguration);
-      }
    }
 
    @Override
    public void deleteQueue(SimpleString addressName, SimpleString queueName) 
throws Exception {
       if (logger.isDebugEnabled()) {
-         logger.debug("destroy queue " + queueName + " on address = " + 
addressName);
+         logger.debug(server + " destroy queue " + queueName + " on address = 
" + addressName + " server " + server.getIdentity());
       }
       try {
-         server.destroyQueue(queueName);
+         server.destroyQueue(queueName,null, false, true, false, false);
       } catch (ActiveMQNonExistentQueueException expected) {
-         logger.debug("queue " + queueName + " was previously removed", 
expected);
+         logger.debug(server + " queue " + queueName + " was previously 
removed", expected);
       }
    }
 
-   private static ToLongFunction<MessageReference> referenceIDSupplier = 
(source) -> {
-      Long id = (Long) 
source.getMessage().getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
-      if (id == null) {
-         return -1;
-      } else {
-         return id;
+   public boolean postAcknowledge(String address, String queue, String nodeID, 
long messageID, ACKMessageOperation ackMessage) throws Exception {
+      final Queue targetQueue = server.locateQueue(queue);
+
+      if (targetQueue == null) {
+         logger.warn("Queue " + queue + " not found on mirror target, ignoring 
ack for queue=" + queue + ", messageID=" + messageID + ", nodeID=" + nodeID);
+         return false;
       }
-   };
 
-   public void postAcknowledge(String address, String queue, long messageID) {
       if (logger.isDebugEnabled()) {
-         logger.debug("post acking " + address + ", queue = " + queue + ", 
messageID = " + messageID);
+         // we only do the following check if debug
+         if (targetQueue.getConsumerCount() > 0) {
+            logger.debug("server " + server.getIdentity() + ", queue " + 
targetQueue.getName() + " has consumers while delivering ack for " + messageID);
+         }
       }
 
-      Queue targetQueue = server.locateQueue(queue);
-      if (targetQueue != null) {
-         MessageReference reference = 
targetQueue.removeWithSuppliedID(messageID, referenceIDSupplier);
-         if (reference != null) {
-            if (logger.isDebugEnabled()) {
-               logger.debug("Acking reference " + reference);
-            }
-            try {
-               targetQueue.acknowledge(reference);
-            } catch (Exception e) {
-               // TODO anything else I can do here?
-               // such as close the connection with error?
-               logger.warn(e.getMessage(), e);
-            }
-         } else {
-            if (logger.isTraceEnabled()) {
-               logger.trace("There is no reference to ack on " + messageID);
-            }
+      if (logger.isTraceEnabled()) {
+         logger.trace("Server " + server.getIdentity() + " with queue = " + 
queue + " being acked for " + messageID + " coming from " + messageID + " 
targetQueue = " + targetQueue);
+      }
+
+      performAck(nodeID, messageID, targetQueue, ackMessage, true);
+      return true;
+
+   }
+
+   private void performAck(String nodeID, long messageID, Queue targetQueue, 
ACKMessageOperation ackMessageOperation, boolean retry) {
+      if (logger.isTraceEnabled()) {
+         logger.trace("performAck (nodeID=" + nodeID + ", messageID=" + 
messageID + ")" + ", targetQueue=" + targetQueue.getName());
+      }
+      MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, 
messageID, referenceIDSupplier);
+      if (reference == null && retry) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Retrying Reference not found on messageID=" + 
messageID + " nodeID=" + nodeID);
+         }
+         targetQueue.flushOnIntermediate(() -> {
+            recoverContext();
+            performAck(nodeID, messageID, targetQueue, ackMessageOperation, 
false);
+         });
+         return;
+      }
+      if (reference != null) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Post ack Server " + server + " worked well for 
messageID=" + messageID + " nodeID=" + nodeID);
+         }
+         try {
+            targetQueue.acknowledge(reference);
+            
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation);
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+         }
+      } else {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Post ack Server " + server + " could not find 
messageID = " + messageID +
+                            " representing nodeID=" + nodeID);
          }
       }
 
    }
 
-   private void sendMessage(AMQPMessage message) throws Exception {
+   private boolean sendMessage(AMQPMessage message, ACKMessageOperation 
messageCompletionAck) throws Exception {
+
       if (message.getMessageID() <= 0) {
          message.setMessageID(server.getStorageManager().generateID());
       }
 
-      Long internalID = (Long) 
AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, INTERNAL_ID);
+      String internalMirrorID = 
(String)AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, 
BROKER_ID);
+      if (internalMirrorID == null) {
+         internalMirrorID = getRemoteMirrorId(); // not pasisng the ID means 
the data was generated on the remote broker
+      }
+      Long internalIDLong = (Long) 
AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, INTERNAL_ID);
       String internalAddress = (String) 
AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, 
INTERNAL_DESTINATION);
 
-      if (internalID != null) {
+      long internalID = 0;
+
+      if (internalIDLong != null) {
+         internalID = internalIDLong;
+      }
+
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("sendMessage on server " + server + " for message " + 
message +
+                         " with internalID = " + internalIDLong + " mirror id 
" + internalMirrorID);
+      }
+
+      final TransactionImpl transaction = new 
MirrorTransaction(server.getStorageManager());
+      transaction.addOperation(messageCompletionAck);
+
+      routingContext.setDuplicateDetection(false); // we do our own duplicate 
detection here
+
+      if (internalID != 0) {
+         byte[] duplicateIDBytes = ByteUtil.longToBytes(internalID);

Review comment:
       fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 620023)
    Time Spent: 17h 40m  (was: 17.5h)

> Enhance AMQP Mirror support with dual mirror
> --------------------------------------------
>
>                 Key: ARTEMIS-3243
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3243
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.17.0
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.18.0
>
>          Time Spent: 17h 40m
>  Remaining Estimate: 0h
>
> at the current Mirror version, we can only mirror into a single direction.
> With this enhancement the two (or more brokers) would be connected to each 
> other, each one having its own ID, and each one would send updates to the 
> other broker.
> The outcome is that if you just transferred producers and consumers from one 
> broker into the other, the fallback would be automatic and simple. No need to 
> disable and enable mirror options.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to