This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new efe450298d ARTEMIS-4527 - Redistributor race when consumerCount
reaches 0 in cluster
efe450298d is described below
commit efe450298dde7f9be0676495907f2746072a68d8
Author: a181321 <[email protected]>
AuthorDate: Tue Jan 30 13:58:01 2024 +0100
ARTEMIS-4527 - Redistributor race when consumerCount reaches 0 in cluster
---
.../core/postoffice/impl/PostOfficeImpl.java | 16 +-
.../cluster/impl/ClusterConnectionBridge.java | 8 +-
.../cluster/impl/RemoteQueueBindingImpl.java | 7 +-
tests/soak-tests/pom.xml | 17 ++
.../ClusterNotificationsContinuityTest.java | 256 +++++++++++++++++++++
tests/soak-tests/src/test/scripts/parameters.sh | 6 +
6 files changed, 302 insertions(+), 8 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 9cf8b61908..51ad8e7651 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -334,7 +334,7 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
if (distance < 1) {
//Binding added locally. If a matching remote binding with
consumers exist, add a redistributor
- Binding binding = getBinding(routingName);
+ Binding binding = addressManager.getBinding(routingName);
if (binding != null) {
@@ -435,7 +435,7 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
}
SimpleString addressName =
props.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS);
- Binding binding =
getBinding(CompositeAddress.isFullyQualified(addressName) ? addressName :
queueName);
+ Binding binding =
addressManager.getBinding(CompositeAddress.isFullyQualified(addressName) ?
addressName : queueName);
if (binding != null) {
// We have a local queue
@@ -496,7 +496,7 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
return;
}
- Binding binding = getBinding(queueName);
+ Binding binding = addressManager.getBinding(queueName);
if (binding == null) {
logger.debug("PostOffice notification /
CONSUMER_CLOSED: Could not find queue {}", queueName);
@@ -636,7 +636,7 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
String delimiter =
server.getConfiguration().getWildcardConfiguration().getDelimiterString();
SimpleString internalDivertName =
ResourceNames.getRetroactiveResourceDivertName(prefix, delimiter, address);
- if (getBinding(internalDivertName) != null) {
+ if (addressManager.getBinding(internalDivertName) != null) {
server.destroyDivert(internalDivertName, true);
}
@@ -1081,7 +1081,7 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
}
@Override
- public Binding getBinding(final SimpleString name) {
+ public synchronized Binding getBinding(final SimpleString name) {
return addressManager.getBinding(name);
}
@@ -1620,6 +1620,9 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
final Transaction tx = context.getTransaction();
final Long deliveryTime;
+
+ boolean containsDurables = false;
+
if (message.hasScheduledDeliveryTime()) {
deliveryTime = message.getScheduledDeliveryTime();
} else {
@@ -1658,6 +1661,7 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
final List<Queue> durableQueues = entry.getValue().getDurableQueues();
if (!durableQueues.isEmpty()) {
processRouteToDurableQueues(message, context, deliveryTime, tx,
durableQueues, refs);
+ containsDurables = true;
}
}
@@ -1669,6 +1673,8 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
if (tx != null) {
tx.addOperation(new AddOperation(refs));
+ } else if (!containsDurables) {
+ processReferences(refs, direct);
} else {
// This will use the same thread if there are no pending operations
// avoiding a context switch on this case
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index 2bd045500d..22a8bdb476 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -280,7 +281,12 @@ public class ClusterConnectionBridge extends BridgeImpl {
createPermissiveManagementNotificationToFilter() +
")");
- sessionConsumer.createQueue(new
QueueConfiguration(notifQueueName).setAddress(managementNotificationAddress).setFilterString(filter).setDurable(false).setTemporary(true));
+ sessionConsumer.createQueue(new QueueConfiguration(notifQueueName)
+
.setAddress(managementNotificationAddress)
+ .setFilterString(filter)
+ .setDurable(false)
+ .setTemporary(true)
+
.setRoutingType(RoutingType.MULTICAST));
notifConsumer = sessionConsumer.createConsumer(notifQueueName);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
index 992b909ff8..5680d7aa54 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java
@@ -156,7 +156,7 @@ public class RemoteQueueBindingImpl implements
RemoteQueueBinding {
@Override
public synchronized boolean isHighAcceptPriority(final Message message) {
- if (consumerCount == 0 ||
messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
+ if (consumerCount <= 0 ||
messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {
return false;
}
@@ -242,7 +242,10 @@ public class RemoteQueueBindingImpl implements
RemoteQueueBinding {
}
}
- consumerCount--;
+ if (--consumerCount < 0) {
+ consumerCount = 0;
+ }
+
}
@Override
diff --git a/tests/soak-tests/pom.xml b/tests/soak-tests/pom.xml
index 70aebe5297..ed669afd00 100644
--- a/tests/soak-tests/pom.xml
+++ b/tests/soak-tests/pom.xml
@@ -169,6 +169,23 @@
<artifactId>jakarta.json-api</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-jms</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/clusterNotificationsContinuity/ClusterNotificationsContinuityTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/clusterNotificationsContinuity/ClusterNotificationsContinuityTest.java
new file mode 100644
index 0000000000..09bffd1aef
--- /dev/null
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/clusterNotificationsContinuity/ClusterNotificationsContinuityTest.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.clusterNotificationsContinuity;
+
+import javax.jms.Connection;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.SpawnedVMSupport;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+import static org.apache.activemq.artemis.utils.TestParameters.testProperty;
+
+/**
+ * Refer to ./scripts/parameters.sh for suggested parameters
+ *
+ * Tests for an issue that's dependent on high overall system load.
+ * The following parameters are used to tune the resource demands of the test:
+ * NUMBER_OF_SERVERS, NUMBER_OF_QUEUES, NUMBER_OF_CONSUMERS
+ *
+ */
+
+public class ClusterNotificationsContinuityTest extends SoakTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final String SERVER_NAME_BASE =
"clusterNotifications/cncBroker-";
+ public static final int SERVER_PORT_BASE = 61616;
+ private static final String TEST_NAME = "CLUSTER_NOTIFICATIONS_CONTINUITY";
+ private static final boolean TEST_ENABLED =
Boolean.parseBoolean(testProperty(TEST_NAME, "TEST_ENABLED", "true"));
+
+ private static final int NUMBER_OF_SERVERS = testProperty(TEST_NAME,
"NUMBER_OF_SERVERS", 3);
+ private static final int NUMBER_OF_QUEUES = testProperty(TEST_NAME,
"NUMBER_OF_QUEUES", 200);
+ private static final int NUMBER_OF_WORKERS = testProperty(TEST_NAME,
"NUMBER_OF_WORKERS", 10);
+ private static final String QUEUE_NAME_PREFIX = "TEST.QUEUE.";
+ private final Process[] serverProcesses = new Process[NUMBER_OF_SERVERS];
+ private Process dmlcProcess;
+
+ @BeforeClass
+ public static void createServers() throws Exception {
+ for (int s = 0; s < NUMBER_OF_SERVERS; s++) {
+ String serverName = SERVER_NAME_BASE + s;
+
+ String staticClusterURI;
+
+ {
+ StringBuffer urlBuffer = new StringBuffer();
+ boolean first = true;
+ for (int i = 0; i < NUMBER_OF_SERVERS; i++) {
+ if (i != s) {
+ if (!first) {
+ urlBuffer.append(",");
+ }
+ first = false;
+ urlBuffer.append("tcp://localhost:" + (SERVER_PORT_BASE +
i));
+ }
+ }
+
+ staticClusterURI = urlBuffer.toString();
+ }
+
+ File serverLocation = getFileServerLocation(serverName);
+ deleteDirectory(serverLocation);
+
+ HelperCreate cliCreateServer = new HelperCreate();
+ cliCreateServer
+ .setRole("amq")
+ .setUser("admin")
+ .setPassword("admin")
+ .setAllowAnonymous(true)
+ .setNoWeb(true)
+ .setArtemisInstance(serverLocation)
+ .setPortOffset(s)
+ .setClustered(true);
+
+ cliCreateServer.setMessageLoadBalancing("OFF_WITH_REDISTRIBUTION");
+ cliCreateServer.setStaticCluster(staticClusterURI);
+ cliCreateServer.setArgs("--no-stomp-acceptor",
"--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor",
"--max-hops", "1");
+
+ cliCreateServer.createServer();
+
+ Properties brokerProperties = new Properties();
+ brokerProperties.put("addressesSettings.#.redistributionDelay", "0");
+
+ File brokerPropertiesFile = new File(serverLocation,
"broker.properties");
+ saveProperties(brokerProperties, brokerPropertiesFile);
+ }
+ }
+
+ @Before
+ public void before() throws Exception {
+ Assume.assumeTrue(TEST_ENABLED);
+ for (int i = 0; i < NUMBER_OF_SERVERS; i++) {
+ String serverName = SERVER_NAME_BASE + i;
+
+ cleanupData(serverName);
+ File brokerPropertiesFile = new File(getServerLocation(serverName),
"broker.properties");
+ serverProcesses[i] = startServer(serverName, 0, 0,
brokerPropertiesFile);
+ }
+
+ for (int i = 0; i < NUMBER_OF_SERVERS; i++) {
+ ServerUtil.waitForServerToStart(i, 10_000);
+ SimpleManagement simpleManagement = new
SimpleManagement("tcp://localhost:" + (SERVER_PORT_BASE + i), null, null);
+ Wait.assertEquals(NUMBER_OF_SERVERS, () ->
simpleManagement.listNetworkTopology().size(), 5000);
+ }
+ }
+
+ @Test
+ public void testClusterNotificationsContinuity() throws Throwable {
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
+ runAfter(factory::close);
+
+ CountDownLatch latch = new CountDownLatch(NUMBER_OF_QUEUES);
+ ExecutorService executorService =
Executors.newFixedThreadPool(NUMBER_OF_WORKERS);
+ runAfter(executorService::shutdownNow);
+
+ //run dmlc in spawned process to more easily manage its lifecycle
+ dmlcProcess =
SpawnedVMSupport.spawnVM("org.apache.activemq.artemis.tests.soak.clusterNotificationsContinuity.ClusterNotificationsContinuityTest");
+ runAfter(dmlcProcess::destroyForcibly);
+
+ for (int i = 0; i < NUMBER_OF_QUEUES; i++) {
+ Queue queue = ActiveMQDestination.createQueue(QUEUE_NAME_PREFIX + i);
+
+ executorService.execute(() -> {
+ try (Connection connection = factory.createConnection();
+ Session session =
connection.createSession(Session.SESSION_TRANSACTED)) {
+
+ logger.debug("Sending message to queue: {}",
queue.getQueueName());
+
session.createProducer(queue).send(session.createTextMessage("Message"));
+ session.commit();
+
+ latch.countDown();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ });
+ }
+
+ latch.await(5000, TimeUnit.MILLISECONDS);
+
+ if (!dmlcProcess.waitFor(30_000, TimeUnit.MILLISECONDS)) {
+ dmlcProcess.destroyForcibly();
+ }
+
+ for (int i = 0; i < NUMBER_OF_SERVERS; i++) {
+ String serverName = SERVER_NAME_BASE + i;
+
+ File artemisLog = new File("target/" + serverName +
"/log/artemis.log");
+ checkLogRecord(artemisLog, false, "AMQ224037");
+ }
+
+ }
+
+ @After
+ public void cleanup() {
+ SpawnedVMSupport.forceKill();
+
+ for (int i = 0; i < NUMBER_OF_SERVERS; i++) {
+ serverProcesses[i].destroy();
+ String serverName = SERVER_NAME_BASE + i;
+ cleanupData(serverName);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ ClusterNotificationsContinuityTest cncTest = new
ClusterNotificationsContinuityTest();
+ cncTest.runDMLCClient();
+ }
+
+ private void runDMLCClient() throws Exception {
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
+ runAfter(factory::close);
+
+ CountDownLatch latch = new CountDownLatch(NUMBER_OF_QUEUES);
+ List<DefaultMessageListenerContainer> containers = new ArrayList<>();
+
+ for (int i = 0; i < NUMBER_OF_QUEUES; i++) {
+ try {
+ String queueName = QUEUE_NAME_PREFIX + i;
+
+ DefaultMessageListenerContainer container = new
DefaultMessageListenerContainer();
+ container.setCacheLevelName("CACHE_NONE");
+ container.setSessionTransacted(true);
+ container.setSessionAcknowledgeModeName("SESSION_TRANSACTED");
+ container.setConcurrentConsumers(NUMBER_OF_WORKERS);
+ container.setConnectionFactory(new
CachingConnectionFactory(factory));
+ container.setDestinationName(queueName);
+ container.setReceiveTimeout(100);
+ container.setMessageListener((MessageListener) msg -> {
+ logger.debug("Message received on queue: {} ", queueName);
+ latch.countDown();
+ });
+
+ container.initialize();
+ container.start();
+ containers.add(container);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ latch.await(10_000, TimeUnit.MILLISECONDS);
+
+ containers.parallelStream().forEach(dmlc -> {
+ try {
+ dmlc.stop();
+ Wait.waitFor(() -> dmlc.getActiveConsumerCount() == 0);
+ dmlc.shutdown();
+ ((CachingConnectionFactory) dmlc.getConnectionFactory()).destroy();
+ } catch (Exception ignore) {
+ }
+ });
+
+ }
+
+}
diff --git a/tests/soak-tests/src/test/scripts/parameters.sh
b/tests/soak-tests/src/test/scripts/parameters.sh
index 8f443c681a..62908b885b 100755
--- a/tests/soak-tests/src/test/scripts/parameters.sh
+++ b/tests/soak-tests/src/test/scripts/parameters.sh
@@ -136,3 +136,9 @@ export TEST_CLIENT_FAILURE_OPENWIRE_TOTAL_ITERATION=2
export TEST_CLIENT_FAILURE_OPENWIRE_NUMBER_OF_VMS=5
export TEST_CLIENT_FAILURE_OPENWIRE_NUMBER_OF_MESSAGES=20000
export TEST_CLIENT_FAILURE_OPENWIRE_MEMORY_CLIENT=-Xmx256m
+
+#clusterNotificationsContinuityTest
+export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_TEST_ENABLED=true
+export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_SERVERS=3
+export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_QUEUES=200
+export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_WORKERS=10
\ No newline at end of file