This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 9199b1f2dc12a8b83c4bf7d2cd3fff346e5a838a
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Sep 25 12:15:59 2024 -0400

    ARTEMIS-5067 Preventing Race when one side is not connected in the mirror
    
    There was already some verification at 
AMQPMirrorControllerSource::invalidTarget
    however the verification failed on soak test ReplicatedBothNodesMirrorTest,
    and an user I was working with also gave me evidence of this happening.
    
    I'm improving the previous verification, which is actually a simplification 
that works on every case.
---
 .../connect/mirror/AMQPMirrorControllerSource.java |  16 +-
 .../connect/AMQPMirrorOneSideDisconnectedTest.java | 181 +++++++++++++++++++++
 2 files changed, 191 insertions(+), 6 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 5081f08c0d..93cf9e7d9e 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -334,11 +334,6 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
          return;
       }
 
-      if (invalidTarget(context.getMirrorSource(), message)) {
-         logger.trace("sendMessage::server {} is discarding send to avoid 
infinite loop (reflection with the mirror)", server);
-         return;
-      }
-
       if (ignoreAddress(address)) {
          logger.trace("sendMessage::server {} is discarding send to address 
{}, address doesn't match filter", server, address);
          return;
@@ -351,7 +346,16 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
 
          String nodeID = idSupplier.getServerID(message);
 
-         if (nodeID != null && nodeID.equals(getRemoteMirrorId())) {
+         String remoteID = getRemoteMirrorId();
+
+         if (remoteID == null) {
+            if (AMQPMirrorControllerTarget.getControllerInUse() != null) {
+               // In case source has not yet connected, we need to take the ID 
from the Target in use to avoid infinite reflections
+               remoteID = 
AMQPMirrorControllerTarget.getControllerInUse().getRemoteMirrorId();
+            }
+         }
+
+         if (nodeID != null && nodeID.equals(remoteID)) {
             logger.trace("sendMessage::Message {} already belonged to the 
node, {}, it won't circle send", message, getRemoteMirrorId());
             return;
          }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorOneSideDisconnectedTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorOneSideDisconnectedTest.java
new file mode 100644
index 0000000000..82e4fe070f
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorOneSideDisconnectedTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class AMQPMirrorOneSideDisconnectedTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   protected TransportConfiguration newAcceptorConfig(int port, String name) {
+      HashMap<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port));
+      params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP,CORE,OPENWIRE");
+      HashMap<String, Object> amqpParams = new HashMap<>();
+      TransportConfiguration tc = new 
TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, name, amqpParams);
+      return tc;
+   }
+
+   protected ActiveMQServer createServer(int port, String brokerName) throws 
Exception {
+
+      final ActiveMQServer server = this.createServer(true, true);
+
+      server.getConfiguration().getAcceptorConfigurations().clear();
+      
server.getConfiguration().getAcceptorConfigurations().add(newAcceptorConfig(port,
 "netty-acceptor"));
+      server.getConfiguration().setName(brokerName);
+      
server.getConfiguration().setJournalDirectory(server.getConfiguration().getJournalDirectory()
 + port);
+      
server.getConfiguration().setBindingsDirectory(server.getConfiguration().getBindingsDirectory()
 + port);
+      
server.getConfiguration().setPagingDirectory(server.getConfiguration().getPagingDirectory()
 + port);
+      server.getConfiguration().setJMXManagementEnabled(true);
+      server.getConfiguration().setMessageExpiryScanPeriod(100);
+      return server;
+   }
+
+
+   @Test
+   public void testOneSideDisconnected() throws Exception {
+      ActiveMQServer serverA = createServer(5671, "serverA");
+      ActiveMQServer serverB = createServer(6671, "serverB");
+
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("A_to_B", 
"tcp://localhost:6671").setReconnectAttempts(-1).setRetryInterval(10);
+         AMQPMirrorBrokerConnectionElement replica1 = new 
AMQPMirrorBrokerConnectionElement().setType(AMQPBrokerConnectionAddressType.MIRROR).setDurable(true);
+         amqpConnection.addElement(replica1);
+         serverA.getConfiguration().addAMQPConnection(amqpConnection);
+      }
+
+      {
+         AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("B_to_A", 
"tcp://localhost:5680").setReconnectAttempts(-1).setRetryInterval(10);
+         AMQPMirrorBrokerConnectionElement replica1 = new 
AMQPMirrorBrokerConnectionElement().setType(AMQPBrokerConnectionAddressType.MIRROR).setDurable(true);
+         amqpConnection.addElement(replica1);
+         serverB.getConfiguration().addAMQPConnection(amqpConnection);
+      }
+
+      String queueName = "queue" + RandomUtil.randomString();
+      String divertedQueue = "queue" + RandomUtil.randomString();
+
+      serverA.getConfiguration().addDivertConfiguration(new 
DivertConfiguration().setAddress(queueName).setForwardingAddress(divertedQueue).setExclusive(false).setName("divertOne"));
+      serverB.getConfiguration().addDivertConfiguration(new 
DivertConfiguration().setAddress(queueName).setForwardingAddress(divertedQueue).setExclusive(false).setName("divertOne"));
+
+      serverA.setIdentity("serverA");
+      serverB.setIdentity("serverB");
+      serverA.start();
+      serverB.start();
+      
serverA.createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
+      
serverA.createQueue(QueueConfiguration.of(divertedQueue).setRoutingType(RoutingType.ANYCAST));
+      try {
+         
serverB.createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
+         
serverB.createQueue(QueueConfiguration.of(divertedQueue).setRoutingType(RoutingType.ANYCAST));
+      } catch (Exception ignored) {
+      }
+
+      Queue divertedQueueA = serverA.locateQueue(divertedQueue);
+      Queue divertedQueueB = serverB.locateQueue(divertedQueue);
+      Queue queueA = serverA.locateQueue(queueName);
+      Queue queueB = serverB.locateQueue(queueName);
+
+
+      long nmessages = 10;
+
+      ConnectionFactory factoryA = 
CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:5671");
+      try (Connection connection = factoryA.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageProducer producer = 
session.createProducer(session.createQueue(queueName));
+         for (int i = 0; i < nmessages; i++) {
+            producer.send(session.createTextMessage("hello " + i));
+         }
+         session.commit();
+      }
+
+      Wait.assertEquals(nmessages, divertedQueueA::getMessageCount, 5000, 100);
+      Wait.assertEquals(nmessages, divertedQueueB::getMessageCount, 5000, 100);
+      Wait.assertEquals(nmessages, queueA::getMessageCount, 5000, 100);
+      Wait.assertEquals(nmessages, queueB::getMessageCount, 5000, 100);
+
+
+      Queue serverAMirrorSNF = 
serverA.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_A_to_B");
+      Wait.assertEquals(0L, serverAMirrorSNF::getMessageCount, 5000, 100);
+
+      TransportConfiguration newConfig = newAcceptorConfig(5680, 
"lateAcceptor");
+      serverA.getRemotingService().createAcceptor(newConfig);
+      serverA.getRemotingService().getAcceptor("lateAcceptor").start();
+
+      Queue serverBMirrorSNF = 
serverB.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_B_to_A");
+      Wait.assertEquals(0L, serverBMirrorSNF::getMessageCount, 5000, 100);
+
+      try (Connection connection = factoryA.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue(queueName));
+         connection.start();
+         for (int i = 0; i < nmessages; i++) {
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            assertNotNull(message);
+            assertEquals("hello " + i, message.getText());
+         }
+         session.commit();
+         assertNull(consumer.receiveNoWait());
+         consumer.close();
+
+         consumer = session.createConsumer(session.createQueue(divertedQueue));
+         connection.start();
+         for (int i = 0; i < nmessages; i++) {
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            assertNotNull(message);
+            assertEquals("hello " + i, message.getText());
+         }
+         session.commit();
+         assertNull(consumer.receiveNoWait());
+      }
+
+      Wait.assertEquals(0L, divertedQueueA::getMessageCount, 5000, 100);
+      Wait.assertEquals(0L, divertedQueueB::getMessageCount, 5000, 100);
+      Wait.assertEquals(0L, queueA::getMessageCount, 5000, 100);
+      Wait.assertEquals(0L, queueB::getMessageCount, 5000, 100);
+
+   }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to