This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new efe450298d ARTEMIS-4527 - Redistributor race when consumerCount 
reaches 0 in cluster
efe450298d is described below

commit efe450298dde7f9be0676495907f2746072a68d8
Author: a181321 <[email protected]>
AuthorDate: Tue Jan 30 13:58:01 2024 +0100

    ARTEMIS-4527 - Redistributor race when consumerCount reaches 0 in cluster
---
 .../core/postoffice/impl/PostOfficeImpl.java       |  16 +-
 .../cluster/impl/ClusterConnectionBridge.java      |   8 +-
 .../cluster/impl/RemoteQueueBindingImpl.java       |   7 +-
 tests/soak-tests/pom.xml                           |  17 ++
 .../ClusterNotificationsContinuityTest.java        | 256 +++++++++++++++++++++
 tests/soak-tests/src/test/scripts/parameters.sh    |   6 +
 6 files changed, 302 insertions(+), 8 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 9cf8b61908..51ad8e7651 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -334,7 +334,7 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
 
                if (distance < 1) {
                   //Binding added locally. If a matching remote binding with 
consumers exist, add a redistributor
-                  Binding binding = getBinding(routingName);
+                  Binding binding = addressManager.getBinding(routingName);
 
                   if (binding != null) {
 
@@ -435,7 +435,7 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
                   }
 
                   SimpleString addressName = 
props.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS);
-                  Binding binding = 
getBinding(CompositeAddress.isFullyQualified(addressName) ? addressName : 
queueName);
+                  Binding binding = 
addressManager.getBinding(CompositeAddress.isFullyQualified(addressName) ? 
addressName : queueName);
 
                   if (binding != null) {
                      // We have a local queue
@@ -496,7 +496,7 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
                         return;
                      }
 
-                     Binding binding = getBinding(queueName);
+                     Binding binding = addressManager.getBinding(queueName);
 
                      if (binding == null) {
                         logger.debug("PostOffice notification / 
CONSUMER_CLOSED: Could not find queue {}", queueName);
@@ -636,7 +636,7 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
       String delimiter = 
server.getConfiguration().getWildcardConfiguration().getDelimiterString();
 
       SimpleString internalDivertName = 
ResourceNames.getRetroactiveResourceDivertName(prefix, delimiter, address);
-      if (getBinding(internalDivertName) != null) {
+      if (addressManager.getBinding(internalDivertName) != null) {
          server.destroyDivert(internalDivertName, true);
       }
 
@@ -1081,7 +1081,7 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
    }
 
    @Override
-   public Binding getBinding(final SimpleString name) {
+   public synchronized Binding getBinding(final SimpleString name) {
       return addressManager.getBinding(name);
    }
 
@@ -1620,6 +1620,9 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
       final Transaction tx = context.getTransaction();
 
       final Long deliveryTime;
+
+      boolean containsDurables = false;
+
       if (message.hasScheduledDeliveryTime()) {
          deliveryTime = message.getScheduledDeliveryTime();
       } else {
@@ -1658,6 +1661,7 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
          final List<Queue> durableQueues = entry.getValue().getDurableQueues();
          if (!durableQueues.isEmpty()) {
             processRouteToDurableQueues(message, context, deliveryTime, tx, 
durableQueues, refs);
+            containsDurables = true;
          }
       }
 
@@ -1669,6 +1673,8 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
 
       if (tx != null) {
          tx.addOperation(new AddOperation(refs));
+      } else if (!containsDurables) {
+         processReferences(refs, direct);
       } else {
          // This will use the same thread if there are no pending operations
          // avoiding a context switch on this case
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index 2bd045500d..22a8bdb476 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -280,7 +281,12 @@ public class ClusterConnectionBridge extends BridgeImpl {
                                                    
createPermissiveManagementNotificationToFilter() +
                                                    ")");
 
-         sessionConsumer.createQueue(new 
QueueConfiguration(notifQueueName).setAddress(managementNotificationAddress).setFilterString(filter).setDurable(false).setTemporary(true));
+         sessionConsumer.createQueue(new QueueConfiguration(notifQueueName)
+                                        
.setAddress(managementNotificationAddress)
+                                        .setFilterString(filter)
+                                        .setDurable(false)
+                                        .setTemporary(true)
+                                        
.setRoutingType(RoutingType.MULTICAST));
 
          notifConsumer = sessionConsumer.createConsumer(notifQueueName);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
index 992b909ff8..5680d7aa54 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
@@ -156,7 +156,7 @@ public class RemoteQueueBindingImpl implements 
RemoteQueueBinding {
 
    @Override
    public synchronized boolean isHighAcceptPriority(final Message message) {
-      if (consumerCount == 0 || 
messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
+      if (consumerCount <= 0 || 
messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
          return false;
       }
 
@@ -242,7 +242,10 @@ public class RemoteQueueBindingImpl implements 
RemoteQueueBinding {
          }
       }
 
-      consumerCount--;
+      if (--consumerCount < 0) {
+         consumerCount = 0;
+      }
+
    }
 
    @Override
diff --git a/tests/soak-tests/pom.xml b/tests/soak-tests/pom.xml
index 70aebe5297..ed669afd00 100644
--- a/tests/soak-tests/pom.xml
+++ b/tests/soak-tests/pom.xml
@@ -169,6 +169,23 @@
          <artifactId>jakarta.json-api</artifactId>
          <scope>test</scope>
       </dependency>
+
+      <dependency>
+         <groupId>org.springframework</groupId>
+         <artifactId>spring-core</artifactId>
+         <scope>test</scope>
+      </dependency>
+      <dependency>
+         <groupId>org.springframework</groupId>
+         <artifactId>spring-context</artifactId>
+         <scope>test</scope>
+      </dependency>
+      <dependency>
+         <groupId>org.springframework</groupId>
+         <artifactId>spring-jms</artifactId>
+         <scope>test</scope>
+      </dependency>
+
    </dependencies>
 
    <build>
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/clusterNotificationsContinuity/ClusterNotificationsContinuityTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/clusterNotificationsContinuity/ClusterNotificationsContinuityTest.java
new file mode 100644
index 0000000000..09bffd1aef
--- /dev/null
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/clusterNotificationsContinuity/ClusterNotificationsContinuityTest.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.clusterNotificationsContinuity;
+
+import javax.jms.Connection;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.SpawnedVMSupport;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+import static org.apache.activemq.artemis.utils.TestParameters.testProperty;
+
+/**
+ * Refer to ./scripts/parameters.sh for suggested parameters
+ *
+ * Tests for an issue that's dependent on high overall system load.
+ * The following parameters are used to tune the resource demands of the test:
+ * NUMBER_OF_SERVERS, NUMBER_OF_QUEUES, NUMBER_OF_CONSUMERS
+ *
+ */
+
+public class ClusterNotificationsContinuityTest extends SoakTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static final String SERVER_NAME_BASE = 
"clusterNotifications/cncBroker-";
+   public static final int SERVER_PORT_BASE = 61616;
+   private static final String TEST_NAME = "CLUSTER_NOTIFICATIONS_CONTINUITY";
+   private static final boolean TEST_ENABLED = 
Boolean.parseBoolean(testProperty(TEST_NAME, "TEST_ENABLED", "true"));
+
+   private static final int NUMBER_OF_SERVERS = testProperty(TEST_NAME, 
"NUMBER_OF_SERVERS", 3);
+   private static final int NUMBER_OF_QUEUES = testProperty(TEST_NAME, 
"NUMBER_OF_QUEUES", 200);
+   private static final int NUMBER_OF_WORKERS = testProperty(TEST_NAME, 
"NUMBER_OF_WORKERS", 10);
+   private static final String QUEUE_NAME_PREFIX = "TEST.QUEUE.";
+   private final Process[] serverProcesses = new Process[NUMBER_OF_SERVERS];
+   private Process dmlcProcess;
+
+   @BeforeClass
+   public static void createServers() throws Exception {
+      for (int s = 0; s < NUMBER_OF_SERVERS; s++) {
+         String serverName = SERVER_NAME_BASE + s;
+
+         String staticClusterURI;
+
+         {
+            StringBuffer urlBuffer = new StringBuffer();
+            boolean first = true;
+            for (int i = 0; i < NUMBER_OF_SERVERS; i++) {
+               if (i != s) {
+                  if (!first) {
+                     urlBuffer.append(",");
+                  }
+                  first = false;
+                  urlBuffer.append("tcp://localhost:" + (SERVER_PORT_BASE + 
i));
+               }
+            }
+
+            staticClusterURI = urlBuffer.toString();
+         }
+
+         File serverLocation = getFileServerLocation(serverName);
+         deleteDirectory(serverLocation);
+
+         HelperCreate cliCreateServer = new HelperCreate();
+         cliCreateServer
+            .setRole("amq")
+            .setUser("admin")
+            .setPassword("admin")
+            .setAllowAnonymous(true)
+            .setNoWeb(true)
+            .setArtemisInstance(serverLocation)
+            .setPortOffset(s)
+            .setClustered(true);
+
+         cliCreateServer.setMessageLoadBalancing("OFF_WITH_REDISTRIBUTION");
+         cliCreateServer.setStaticCluster(staticClusterURI);
+         cliCreateServer.setArgs("--no-stomp-acceptor", 
"--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", 
"--max-hops", "1");
+
+         cliCreateServer.createServer();
+
+         Properties brokerProperties = new Properties();
+         brokerProperties.put("addressesSettings.#.redistributionDelay", "0");
+
+         File brokerPropertiesFile = new File(serverLocation, 
"broker.properties");
+         saveProperties(brokerProperties, brokerPropertiesFile);
+      }
+   }
+
+   @Before
+   public void before() throws Exception {
+      Assume.assumeTrue(TEST_ENABLED);
+      for (int i = 0; i < NUMBER_OF_SERVERS; i++) {
+         String serverName = SERVER_NAME_BASE + i;
+
+         cleanupData(serverName);
+         File brokerPropertiesFile = new File(getServerLocation(serverName), 
"broker.properties");
+         serverProcesses[i] = startServer(serverName, 0, 0, 
brokerPropertiesFile);
+      }
+
+      for (int i = 0; i < NUMBER_OF_SERVERS; i++) {
+         ServerUtil.waitForServerToStart(i, 10_000);
+         SimpleManagement simpleManagement = new 
SimpleManagement("tcp://localhost:" + (SERVER_PORT_BASE + i), null, null);
+         Wait.assertEquals(NUMBER_OF_SERVERS, () -> 
simpleManagement.listNetworkTopology().size(), 5000);
+      }
+   }
+
+   @Test
+   public void testClusterNotificationsContinuity() throws Throwable {
+      ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("tcp://localhost:61616");
+      runAfter(factory::close);
+
+      CountDownLatch latch = new CountDownLatch(NUMBER_OF_QUEUES);
+      ExecutorService executorService = 
Executors.newFixedThreadPool(NUMBER_OF_WORKERS);
+      runAfter(executorService::shutdownNow);
+
+      //run dmlc in spawned process to more easily manage its lifecycle
+      dmlcProcess = 
SpawnedVMSupport.spawnVM("org.apache.activemq.artemis.tests.soak.clusterNotificationsContinuity.ClusterNotificationsContinuityTest");
+      runAfter(dmlcProcess::destroyForcibly);
+
+      for (int i = 0; i < NUMBER_OF_QUEUES; i++) {
+         Queue queue = ActiveMQDestination.createQueue(QUEUE_NAME_PREFIX + i);
+
+         executorService.execute(() -> {
+            try (Connection connection = factory.createConnection();
+                 Session session = 
connection.createSession(Session.SESSION_TRANSACTED)) {
+
+               logger.debug("Sending message to queue: {}", 
queue.getQueueName());
+               
session.createProducer(queue).send(session.createTextMessage("Message"));
+               session.commit();
+
+               latch.countDown();
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         });
+      }
+
+      latch.await(5000, TimeUnit.MILLISECONDS);
+
+      if (!dmlcProcess.waitFor(30_000, TimeUnit.MILLISECONDS)) {
+         dmlcProcess.destroyForcibly();
+      }
+
+      for (int i = 0; i < NUMBER_OF_SERVERS; i++) {
+         String serverName = SERVER_NAME_BASE + i;
+
+         File artemisLog = new File("target/" + serverName + 
"/log/artemis.log");
+         checkLogRecord(artemisLog, false, "AMQ224037");
+      }
+
+   }
+
+   @After
+   public void cleanup() {
+      SpawnedVMSupport.forceKill();
+
+      for (int i = 0; i < NUMBER_OF_SERVERS; i++) {
+         serverProcesses[i].destroy();
+         String serverName = SERVER_NAME_BASE + i;
+         cleanupData(serverName);
+      }
+   }
+
+   public static void main(String[] args) throws Exception {
+      ClusterNotificationsContinuityTest cncTest = new 
ClusterNotificationsContinuityTest();
+      cncTest.runDMLCClient();
+   }
+
+   private void runDMLCClient() throws Exception {
+      ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("tcp://localhost:61616");
+      runAfter(factory::close);
+
+      CountDownLatch latch = new CountDownLatch(NUMBER_OF_QUEUES);
+      List<DefaultMessageListenerContainer> containers = new ArrayList<>();
+
+      for (int i = 0; i < NUMBER_OF_QUEUES; i++) {
+         try {
+            String queueName = QUEUE_NAME_PREFIX + i;
+
+            DefaultMessageListenerContainer container = new 
DefaultMessageListenerContainer();
+            container.setCacheLevelName("CACHE_NONE");
+            container.setSessionTransacted(true);
+            container.setSessionAcknowledgeModeName("SESSION_TRANSACTED");
+            container.setConcurrentConsumers(NUMBER_OF_WORKERS);
+            container.setConnectionFactory(new 
CachingConnectionFactory(factory));
+            container.setDestinationName(queueName);
+            container.setReceiveTimeout(100);
+            container.setMessageListener((MessageListener) msg -> {
+               logger.debug("Message received on queue: {} ", queueName);
+               latch.countDown();
+            });
+
+            container.initialize();
+            container.start();
+            containers.add(container);
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+         }
+      }
+
+      latch.await(10_000, TimeUnit.MILLISECONDS);
+
+      containers.parallelStream().forEach(dmlc -> {
+         try {
+            dmlc.stop();
+            Wait.waitFor(() -> dmlc.getActiveConsumerCount() == 0);
+            dmlc.shutdown();
+            ((CachingConnectionFactory) dmlc.getConnectionFactory()).destroy();
+         } catch (Exception ignore) {
+         }
+      });
+
+   }
+
+}
diff --git a/tests/soak-tests/src/test/scripts/parameters.sh 
b/tests/soak-tests/src/test/scripts/parameters.sh
index 8f443c681a..62908b885b 100755
--- a/tests/soak-tests/src/test/scripts/parameters.sh
+++ b/tests/soak-tests/src/test/scripts/parameters.sh
@@ -136,3 +136,9 @@ export TEST_CLIENT_FAILURE_OPENWIRE_TOTAL_ITERATION=2
 export TEST_CLIENT_FAILURE_OPENWIRE_NUMBER_OF_VMS=5
 export TEST_CLIENT_FAILURE_OPENWIRE_NUMBER_OF_MESSAGES=20000
 export TEST_CLIENT_FAILURE_OPENWIRE_MEMORY_CLIENT=-Xmx256m
+
+#clusterNotificationsContinuityTest
+export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_TEST_ENABLED=true
+export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_SERVERS=3
+export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_QUEUES=200
+export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_WORKERS=10
\ No newline at end of file

Reply via email to