[ 
https://issues.apache.org/jira/browse/ARTEMIS-3243?focusedWorklogId=619365&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-619365
 ]

ASF GitHub Bot logged work on ARTEMIS-3243:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Jul/21 13:54
            Start Date: 06/Jul/21 13:54
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on a change in pull request #3633:
URL: https://github.com/apache/activemq-artemis/pull/3633#discussion_r663101506



##########
File path: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedList.java
##########
@@ -34,10 +32,10 @@
 
    void clearID();
 
-   /** The ID Supplier function needs to return positive IDs (greater or equal 
to 0)
-    *  If you spply a negative ID, it will be considered a null value, and
+   /** The ID Supplier function needs to return non zero IDs.
+    *  If you spply a zero ID, it will be considered a null value, and

Review comment:
       Javadoc should be on the IDSupplier.getID() method now that it is an 
interface + method, and updated as its no longer just a function.
   
   Missing letter u in "supply".

##########
File path: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
##########
@@ -53,30 +54,23 @@ public LinkedListImpl(Comparator<E> comparator) {
       this(comparator, null);
    }
 
-   public LinkedListImpl(Comparator<E> comparator, ToLongFunction<E> supplier) 
{
+   public LinkedListImpl(Comparator<E> comparator, IDSupplier<E> supplier) {
       iters = createIteratorArray(INITIAL_ITERATOR_ARRAY_SIZE);
       this.comparator = comparator;
       this.idSupplier = supplier;
-      if (idSupplier != null) {
-         this.nodeMap = newLongHashMap();
-      } else {
-         this.nodeMap = null;
-      }
    }
 
    @Override
    public void clearID() {
-      idSupplier = null;
-      if (nodeMap != null) {
-         nodeMap.clear();
-         nodeMap = null;
+      if (idSupplier != null) {

Review comment:
       This method seems to be unused, should we just remove it? Or at least 
add a test that verifies it calls clear on the idSupplier.

##########
File path: 
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
##########
@@ -935,7 +935,6 @@ public void appendAddRecord(final long id,
          throw 
ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, 
maxRecordSize);
       }
 
-      final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, 
callback);

Review comment:
       Unclear why this is being removed? Seems like it would be a fair change 
in behaviour, and a bunch of separate changes resulted in it being the way it 
was. Should this have its own JIRA perhaps?

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceIDSupplier.java
##########
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
+
+import java.util.HashMap;
+
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.utils.collections.IDSupplier;
+import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
+
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
+
+public class ReferenceIDSupplier implements IDSupplier<MessageReference> {
+
+   private final String serverID;
+
+   public ReferenceIDSupplier(ActiveMQServer server) {
+      this.serverID = server.getNodeID().toString();
+   }
+
+   HashMap<Object, LongObjectHashMap<LinkedListImpl.Node<MessageReference>>> 
lists;
+
+   @Override
+   public LongObjectHashMap<LinkedListImpl.Node<MessageReference>> 
getNodesMap(String listID) {
+      if (listID == null) {
+         listID = serverID; // returning for the localList in case it's null
+      }
+      if (lists == null) {
+         lists = new HashMap<>();
+      }

Review comment:
       This seems racey as multiple threads could be going through here. If we 
created the ReferenceIDSupplier instance, I think its reasonable to assume 
there will be a need for the field at some point and create the map in 
construction.
   
   The 'lists' map itself could used by multiple threads and so should be a 
ConcurrentHashMap.

##########
File path: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
##########
@@ -91,10 +85,12 @@ public void setIDSupplier(ToLongFunction<E> supplier) {
       return new LongObjectHashMap<>(Math.max(8, this.size));
    }
 
-   private void putID(E value, Node<E> position) {
-      long id = idSupplier.applyAsLong(value);
-      if (id >= 0) {
-         nodeMap.put(id, position);
+   private void putID(E value, Node<E> node) {
+      String listID = idSupplier.getListID(value);
+      LongObjectHashMap<Node<E>> nodesMap = idSupplier.getNodesMap(listID);

Review comment:
       Is the split lookup used to particular effect, or could it be collapsed 
into a lookup?

##########
File path: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
##########
@@ -258,7 +258,7 @@ public int size() {
     * Return the number of elements we have on suppliedIDs
     */
    public int getSizeOfSuppliedIDs() {
-      return nodeMap == null ? 0 : nodeMap.size();
+      return idSupplier == null ? 0 : idSupplier.size();

Review comment:
       Seems this is this test-only, could it just be removed, if we can now 
lookup the individual maps? (assuming we dont remove that ability, per previous 
comment)

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
##########
@@ -1062,6 +1062,9 @@ public final Object getDuplicateProperty() {
 
    @Override
    public boolean isDurable() {
+      if (!isLargeMessage()) {
+         ensureMessageDataScanned();
+      }

Review comment:
       This has side effects. Not clear to me that calling 'isDurable' really 
ever should. What changed that it was needed?
   
   Seems like it also causes a test failure in 
AmqpJournalLoadingTest.durableMessageDataNotScannedOnRestartTest and if may 
effectively undo the related work Franz did on optimising startup.

##########
File path: 
artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
##########
@@ -39,6 +39,15 @@
 
    private static Logger log = Logger.getLogger(ByteUtilTest.class);
 
+   @Test
+   public void testBytesToLong() {
+      long randomA = RandomUtil.randomLong();
+      byte[] randomABytes = ByteUtil.longToBytes(randomA);
+      long randomAOutput = ByteUtil.bytesToLong(randomABytes);
+
+      Assert.assertEquals(randomA, randomAOutput);

Review comment:
       Would be good to also have a test with known input and expected output.
   
   This test wouldnt detect when ByteUtil.longToBytes() and 
ByteUtil.bytesToLong() were both broken/changed in unexpected but compensating 
ways (e.g flip their endianness)

##########
File path: 
artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ArtemisCreatePlugin.java
##########
@@ -93,6 +93,9 @@
    @Parameter(defaultValue = "false")
    private boolean replicated;
 
+   @Parameter(defaultValue = "false")
+   private boolean cloud;
+

Review comment:
       Leftovers to be removed?

##########
File path: 
artemis-core-client/src/test/java/org/apache/activemq/artemis/tests/uri/ConnectorTransportConfigurationParserURITest.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.uri;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.uri.ConnectorTransportConfigurationParser;
+import org.junit.Test;
+
+public class ConnectorTransportConfigurationParserURITest {
+
+   @Test
+   public void testParse() throws Exception {
+      ConnectorTransportConfigurationParser parser = new 
ConnectorTransportConfigurationParser(false);
+
+      URI transportURI = 
parser.expandURI("tcp://localhost:6161#other:3333,bababa:4444");
+      System.out.println(transportURI);
+      List<TransportConfiguration> objects = parser.newObject(transportURI, 
"test");
+      objects.forEach((t) -> System.out.println(t));

Review comment:
       Is this needed? What is it testing? No assertions, just printlns.

##########
File path: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/Pool.java
##########
@@ -21,21 +21,25 @@
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
+import org.jboss.logging.Logger;
+
 /**
  * A simple encapsulation to provide a pool of objects.
  * @param <T>
  */
 public abstract class Pool<T> {
 
+   private static final Logger logger = Logger.getLogger(Pool.class);
+
    private final Queue<T> internalPool;
 
    private final Consumer<T> cleaner;
-   private final Supplier<T> supplier;
+   protected final Supplier<T> supplier;

Review comment:
       does this need to be made protected anymore?

##########
File path: 
artemis-maven-plugin/src/main/java/org/apache/activemq/artemis/maven/ArtemisCreatePlugin.java
##########
@@ -232,6 +235,10 @@ protected void doExecute() throws MojoExecutionException, 
MojoFailureException {
          add(listCommands, "--no-autotune");
       }
 
+      if (cloud) {
+         add(listCommands, "--cloud");
+      }
+

Review comment:
       Leftovers to be removed?

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
##########
@@ -170,15 +215,26 @@ public void sendMessage(Message message, RoutingContext 
context, List<MessageRef
       }
    }
 
-   public static void validateProtocolData(MessageReference ref, SimpleString 
snfAddress) {
+   public static void validateProtocolData(ReferenceIDSupplier 
referenceIDSupplier, MessageReference ref, SimpleString snfAddress) {
       if (ref.getProtocolData() == null && 
!ref.getMessage().getAddressSimpleString().equals(snfAddress)) {
-         setProtocolData(ref);
+         setProtocolData(referenceIDSupplier, ref);
       }
    }
 
-   private static void setProtocolData(MessageReference ref) {
+   private static void setProtocolData(ReferenceIDSupplier 
referenceIDSupplier, MessageReference ref) {
       Map<Symbol, Object> daMap = new HashMap<>();
       DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap);
+
+      String brokerID = referenceIDSupplier.getListID(ref);
+
+      // getListID will return null when the message was generated on this 
broker.
+      // on this case we do not send the brokerID, and the ControllerTarget 
will get the information from the link.
+      // this is just to safe a few bytes and some processing on the wire.
+      if (brokerID != null) {
+         // not sending the brokerID, will make the other side to get the 
brokerID from the remote link's property
+         daMap.put(BROKER_ID, brokerID);
+      }

Review comment:
       Great, I was constantly thinking of this when reviewing the delta last 
week to stop munging the mirror id into the store id.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
##########
@@ -202,16 +258,36 @@ private static Properties getProperties(Message message) {
 
    @Override
    public void postAcknowledge(MessageReference ref, AckReason reason) throws 
Exception {
+
+      MirrorController targetController = getControllerTarget();
+
+      if (targetController != null || ref.getQueue() != null && 
(ref.getQueue().isInternalQueue() || ref.getQueue().isMirrorController())) {
+         if (logger.isTraceEnabled()) {
+            logger.trace(server + " rejecting postAcknowledge queue=" + 
ref.getQueue().getName() + ", ref=" + ref + " to avoid infinite loop with the 
mirror (reflection)");
+         }
+         return;
+      }

Review comment:
       Should this go after an 'if(acks)' check to avoid useless and 
potentially misleading logging? Previously that was the first check done in the 
method.

##########
File path: artemis-server/src/main/resources/schema/artemis-configuration.xsd
##########
@@ -2188,8 +2189,16 @@
       <xsd:attribute name="source-mirror-address" type="xsd:string" 
use="optional" default="">
          <xsd:annotation>
             <xsd:documentation>
-               By default the replica will use a temporary store and forward 
queue to store events towards the mirror / replica.
-               However if this is set, we will use a defined durable queue.
+               Deprecated: this is ignored from any parsing.
+               The xsd is allowing this for compatibility reasons. This will 
be eventually removed from the codebase.

Review comment:
       If its ignored, then wont it just leave them with a a queue, plus broken 
not-mirroring of any pending stuff? Seems better to fail startup. I would 
remove it then. 

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -267,102 +310,143 @@ public void deleteAddress(AddressInfo addressInfo) 
throws Exception {
    @Override
    public void createQueue(QueueConfiguration queueConfiguration) throws 
Exception {
       if (logger.isDebugEnabled()) {
-         logger.debug("Adding queue " + queueConfiguration);
+         logger.debug(server + " Adding queue " + queueConfiguration);
       }
       server.createQueue(queueConfiguration, true);
-
-      if (scanAddresses != null) {
-         
getQueueScanMap(queueConfiguration.getAddress()).put(queueConfiguration.getName(),
 queueConfiguration);
-      }
    }
 
    @Override
    public void deleteQueue(SimpleString addressName, SimpleString queueName) 
throws Exception {
       if (logger.isDebugEnabled()) {
-         logger.debug("destroy queue " + queueName + " on address = " + 
addressName);
+         logger.debug(server + " destroy queue " + queueName + " on address = 
" + addressName + " server " + server.getIdentity());
       }
       try {
-         server.destroyQueue(queueName);
+         server.destroyQueue(queueName,null, false, true, false, false);
       } catch (ActiveMQNonExistentQueueException expected) {
-         logger.debug("queue " + queueName + " was previously removed", 
expected);
+         logger.debug(server + " queue " + queueName + " was previously 
removed", expected);
       }
    }
 
-   private static ToLongFunction<MessageReference> referenceIDSupplier = 
(source) -> {
-      Long id = (Long) 
source.getMessage().getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
-      if (id == null) {
-         return -1;
-      } else {
-         return id;
+   public boolean postAcknowledge(String address, String queue, String nodeID, 
long messageID, ACKMessageOperation ackMessage) throws Exception {
+      final Queue targetQueue = server.locateQueue(queue);
+
+      if (targetQueue == null) {
+         logger.warn("Queue " + queue + " not found on mirror target, ignoring 
ack for queue=" + queue + ", messageID=" + messageID + ", nodeID=" + nodeID);
+         return false;
       }
-   };
 
-   public void postAcknowledge(String address, String queue, long messageID) {
       if (logger.isDebugEnabled()) {
-         logger.debug("post acking " + address + ", queue = " + queue + ", 
messageID = " + messageID);
+         // we only do the following check if debug
+         if (targetQueue.getConsumerCount() > 0) {
+            logger.debug("server " + server.getIdentity() + ", queue " + 
targetQueue.getName() + " has consumers while delivering ack for " + messageID);
+         }
       }
 
-      Queue targetQueue = server.locateQueue(queue);
-      if (targetQueue != null) {
-         MessageReference reference = 
targetQueue.removeWithSuppliedID(messageID, referenceIDSupplier);
-         if (reference != null) {
-            if (logger.isDebugEnabled()) {
-               logger.debug("Acking reference " + reference);
-            }
-            try {
-               targetQueue.acknowledge(reference);
-            } catch (Exception e) {
-               // TODO anything else I can do here?
-               // such as close the connection with error?
-               logger.warn(e.getMessage(), e);
-            }
-         } else {
-            if (logger.isTraceEnabled()) {
-               logger.trace("There is no reference to ack on " + messageID);
-            }
+      if (logger.isTraceEnabled()) {
+         logger.trace("Server " + server.getIdentity() + " with queue = " + 
queue + " being acked for " + messageID + " coming from " + messageID + " 
targetQueue = " + targetQueue);
+      }
+
+      performAck(nodeID, messageID, targetQueue, ackMessage, true);
+      return true;
+
+   }
+
+   private void performAck(String nodeID, long messageID, Queue targetQueue, 
ACKMessageOperation ackMessageOperation, boolean retry) {
+      if (logger.isTraceEnabled()) {
+         logger.trace("performAck (nodeID=" + nodeID + ", messageID=" + 
messageID + ")" + ", targetQueue=" + targetQueue.getName());
+      }
+      MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, 
messageID, referenceIDSupplier);
+      if (reference == null && retry) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Retrying Reference not found on messageID=" + 
messageID + " nodeID=" + nodeID);
+         }
+         targetQueue.flushOnIntermediate(() -> {
+            recoverContext();
+            performAck(nodeID, messageID, targetQueue, ackMessageOperation, 
false);
+         });
+         return;
+      }
+      if (reference != null) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Post ack Server " + server + " worked well for 
messageID=" + messageID + " nodeID=" + nodeID);
+         }
+         try {
+            targetQueue.acknowledge(reference);
+            
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation);
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+         }
+      } else {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Post ack Server " + server + " could not find 
messageID = " + messageID +
+                            " representing nodeID=" + nodeID);
          }
       }
 
    }
 
-   private void sendMessage(AMQPMessage message) throws Exception {
+   private boolean sendMessage(AMQPMessage message, ACKMessageOperation 
messageCompletionAck) throws Exception {
+
       if (message.getMessageID() <= 0) {
          message.setMessageID(server.getStorageManager().generateID());
       }
 
-      Long internalID = (Long) 
AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, INTERNAL_ID);
+      String internalMirrorID = 
(String)AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, 
BROKER_ID);
+      if (internalMirrorID == null) {
+         internalMirrorID = getRemoteMirrorId(); // not pasisng the ID means 
the data was generated on the remote broker
+      }
+      Long internalIDLong = (Long) 
AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, INTERNAL_ID);
       String internalAddress = (String) 
AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, 
INTERNAL_DESTINATION);
 
-      if (internalID != null) {
+      long internalID = 0;
+
+      if (internalIDLong != null) {
+         internalID = internalIDLong;
+      }
+
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("sendMessage on server " + server + " for message " + 
message +
+                         " with internalID = " + internalIDLong + " mirror id 
" + internalMirrorID);
+      }
+
+      final TransactionImpl transaction = new 
MirrorTransaction(server.getStorageManager());
+      transaction.addOperation(messageCompletionAck);
+
+      routingContext.setDuplicateDetection(false); // we do our own duplicate 
detection here
+
+      if (internalID != 0) {
+         byte[] duplicateIDBytes = ByteUtil.longToBytes(internalID);

Review comment:
       This value is now only using the long id, which every broker generates 
the same sequence and so can overlap. Seems like it needs the server id to be 
considered too if its desired to support e.g chained mirrors etc as you have 
mentioned?

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceIDSupplier.java
##########
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
+
+import java.util.HashMap;
+
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.utils.collections.IDSupplier;
+import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
+
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
+
+public class ReferenceIDSupplier implements IDSupplier<MessageReference> {
+
+   private final String serverID;
+
+   public ReferenceIDSupplier(ActiveMQServer server) {
+      this.serverID = server.getNodeID().toString();
+   }
+
+   HashMap<Object, LongObjectHashMap<LinkedListImpl.Node<MessageReference>>> 
lists;
+
+   @Override
+   public LongObjectHashMap<LinkedListImpl.Node<MessageReference>> 
getNodesMap(String listID) {
+      if (listID == null) {
+         listID = serverID; // returning for the localList in case it's null
+      }
+      if (lists == null) {
+         lists = new HashMap<>();
+      }
+      LongObjectHashMap<LinkedListImpl.Node<MessageReference>> theList = 
lists.get(listID);
+      if (theList == null) {
+         theList = new LongObjectHashMap<>();
+         lists.put(listID, theList);
+      }
+      return theList;
+   }
+
+   @Override
+   public String getListID(MessageReference element) {
+      Object nodeID = 
element.getMessage().getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY);
+      if (nodeID != null) {
+         return nodeID.toString();
+      } else {
+         // the LinkedLlist always need an element here to play with the 
proper list;

Review comment:
       I dont really understand this comment, its returning null but comments 
saying the linkedList always needs an element.
   
   EDIT: In getNodesMap() it replaces null with the server id and comments why 
, that seems to explain the return here, but the two comments somewhat 
contradict. Personally I would just delete this comment, but if not then adjust 
to indicate a null id means local list as the other method does (as does 
AMQPMirrorControllerSource use of it).

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceIDSupplier.java
##########
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
+
+import java.util.HashMap;
+
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.utils.collections.IDSupplier;
+import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
+
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
+
+public class ReferenceIDSupplier implements IDSupplier<MessageReference> {
+
+   private final String serverID;
+
+   public ReferenceIDSupplier(ActiveMQServer server) {
+      this.serverID = server.getNodeID().toString();
+   }
+
+   HashMap<Object, LongObjectHashMap<LinkedListImpl.Node<MessageReference>>> 
lists;
+
+   @Override
+   public LongObjectHashMap<LinkedListImpl.Node<MessageReference>> 
getNodesMap(String listID) {
+      if (listID == null) {
+         listID = serverID; // returning for the localList in case it's null
+      }
+      if (lists == null) {
+         lists = new HashMap<>();
+      }
+      LongObjectHashMap<LinkedListImpl.Node<MessageReference>> theList = 
lists.get(listID);
+      if (theList == null) {
+         theList = new LongObjectHashMap<>();
+         lists.put(listID, theList);
+      }

Review comment:
       This is also unsafe if multiple threads attempt to operate for the same 
entry and try to create it, it should probably use putIfAbsent and check the 
return. 
   
   Of course, if this is possible it also means the returned entry itself also 
probably needs protecting, I doubt it is concurrency safe.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -38,51 +35,136 @@
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.server.mirror.MirrorController;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import 
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import 
org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.collections.IDSupplier;
+import org.apache.activemq.artemis.utils.pools.MpscPool;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.jboss.logging.Logger;
 
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADD_ADDRESS;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_ADDRESS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.BROKER_ID;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.CREATE_QUEUE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_ADDRESS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS_SCAN_START;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS_SCAN_END;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
 
 public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver 
implements MirrorController {
 
-   public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = 
SimpleString.toSimpleString(INTERNAL_ID.toString());
-
    private static final Logger logger = 
Logger.getLogger(AMQPMirrorControllerTarget.class);
 
-   final ActiveMQServer server;
+   private static ThreadLocal<MirrorController> controllerThreadLocal = new 
ThreadLocal<>();
+
+   public static void setControllerTarget(MirrorController controller) {
+      controllerThreadLocal.set(controller);
+   }
+
+   public static MirrorController getControllerTarget() {
+      return controllerThreadLocal.get();
+   }
+
+   class ACKMessageOperation extends TransactionOperationAbstract implements 
IOCallback, Runnable {
+
+      Delivery delivery;
+
+      void reset() {
+         this.delivery = null;
+      }
+
+      ACKMessageOperation setDelivery(Delivery delivery) {
+         this.delivery = delivery;
+         return this;
+      }
+
+      @Override
+      public void run() {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Delivery settling for " + delivery + ", context=" + 
delivery.getContext());
+         }
+         delivery.disposition(Accepted.getInstance());
+         settle(delivery);
+         connection.flush();
+         
AMQPMirrorControllerTarget.this.ackMessageMpscPool.release(ACKMessageOperation.this);
+      }
+
+      @Override
+      public void beforeCommit(Transaction tx) throws Exception {
+      }
+
+      @Override
+      public void afterCommit(Transaction tx) {
+         done();
+      }
+
+      @Override
+      public void done() {
+         connection.runNow(ACKMessageOperation.this);
+      }
+
+      @Override
+      public void onError(int errorCode, String errorMessage) {
+         logger.warn(errorMessage + "-"  + errorMessage);
+      }
+   }
+
+   // in a regular case we should not have more than amqpCredits on the pool, 
that's the max we would need
+   private final MpscPool<ACKMessageOperation> ackMessageMpscPool = new 
MpscPool<>(amqpCredits, ACKMessageOperation::reset, () -> new 
ACKMessageOperation());
 
    final RoutingContextImpl routingContext = new RoutingContextImpl(null);
 
-   Map<SimpleString, Map<SimpleString, QueueConfiguration>> scanAddresses;
+   final BasicMirrorController<Receiver> basicController;
+
+   final ActiveMQServer server;
+
+   final DuplicateIDCache duplicateIDCache;
+
+   private final IDSupplier<MessageReference> referenceIDSupplier;
 
    public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
                                      AMQPConnectionContext connection,
                                      AMQPSessionContext protonSession,
                                      Receiver receiver,
                                      ActiveMQServer server) {
       super(sessionSPI, connection, protonSession, receiver);
+      this.basicController = new BasicMirrorController(server);
+      this.basicController.setLink(receiver);

Review comment:
       I would make this part of the construction, and verify the remote broker 
id at this point, failing if it isnt found given there are uses later that rely 
on it (but dont really check it).
   
   I would probably add an offered+desired link capabilities handshake check 
too to verify that it is a peer on the other side that understands the new 
mirroring setup.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
##########
@@ -111,6 +116,15 @@ public ProtonProtocolManager(ProtonProtocolManagerFactory 
factory, ActiveMQServe
       this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
    }
 
+   public ReferenceIDSupplier getReferenceIDSupplier() {
+      if (referenceIDSupplier == null) {
+         // we lazy start the instance.
+         // only create it when needed
+         referenceIDSupplier = new ReferenceIDSupplier(server);
+      }

Review comment:
       If it is important there is only one of these, as it seems, its creation 
should perhaps be better protected. Looks a bit racy as the get method can be 
called from different threads for different reasons.

##########
File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
##########
@@ -501,4 +501,7 @@ IllegalStateException invalidRoutingTypeUpdate(String 
queueName,
 
    @Message(id = 229234, value = "Invalid slow consumer threshold measurement 
unit {0}", format = Message.Format.MESSAGE_FORMAT)
    IllegalArgumentException invalidSlowConsumerThresholdMeasurementUnit(String 
val);
+
+   @Message(id = 229235, value = "broker-mirror-id needs to be between 1 and 
255, while you passed {0} on the configuration", format = 
Message.Format.MESSAGE_FORMAT)
+   IllegalArgumentException invalidBrokerID(Object val);

Review comment:
       Leftovers for removal?

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -38,51 +35,136 @@
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.server.mirror.MirrorController;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import 
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import 
org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.collections.IDSupplier;
+import org.apache.activemq.artemis.utils.pools.MpscPool;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.jboss.logging.Logger;
 
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADD_ADDRESS;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_ADDRESS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.BROKER_ID;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.CREATE_QUEUE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_ADDRESS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS_SCAN_START;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.ADDRESS_SCAN_END;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
 
 public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver 
implements MirrorController {
 
-   public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = 
SimpleString.toSimpleString(INTERNAL_ID.toString());
-
    private static final Logger logger = 
Logger.getLogger(AMQPMirrorControllerTarget.class);
 
-   final ActiveMQServer server;
+   private static ThreadLocal<MirrorController> controllerThreadLocal = new 
ThreadLocal<>();
+
+   public static void setControllerTarget(MirrorController controller) {
+      controllerThreadLocal.set(controller);
+   }
+
+   public static MirrorController getControllerTarget() {
+      return controllerThreadLocal.get();
+   }
+
+   class ACKMessageOperation extends TransactionOperationAbstract implements 
IOCallback, Runnable {
+
+      Delivery delivery;
+
+      void reset() {
+         this.delivery = null;
+      }
+
+      ACKMessageOperation setDelivery(Delivery delivery) {
+         this.delivery = delivery;
+         return this;
+      }
+
+      @Override
+      public void run() {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Delivery settling for " + delivery + ", context=" + 
delivery.getContext());
+         }
+         delivery.disposition(Accepted.getInstance());
+         settle(delivery);
+         connection.flush();
+         
AMQPMirrorControllerTarget.this.ackMessageMpscPool.release(ACKMessageOperation.this);
+      }
+
+      @Override
+      public void beforeCommit(Transaction tx) throws Exception {
+      }
+
+      @Override
+      public void afterCommit(Transaction tx) {
+         done();
+      }
+
+      @Override
+      public void done() {
+         connection.runNow(ACKMessageOperation.this);
+      }

Review comment:
       When is 'done' called vs 'afterCommit', and can both ever happen? Since 
both cause the run method to be executed, and the run interacts with the usage 
and the pool.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 619365)
    Time Spent: 11h  (was: 10h 50m)

> Enhance AMQP Mirror support with dual mirror
> --------------------------------------------
>
>                 Key: ARTEMIS-3243
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3243
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.17.0
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.18.0
>
>          Time Spent: 11h
>  Remaining Estimate: 0h
>
> at the current Mirror version, we can only mirror into a single direction.
> With this enhancement the two (or more brokers) would be connected to each 
> other, each one having its own ID, and each one would send updates to the 
> other broker.
> The outcome is that if you just transferred producers and consumers from one 
> broker into the other, the fallback would be automatic and simple. No need to 
> disable and enable mirror options.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to