This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 169923a359 ARTEMIS-5498 Scan cluster topology to detect wrong
discovery entries
169923a359 is described below
commit 169923a359a5b85912305673d750e893f2ac872c
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Wed May 28 10:46:03 2025 +0200
ARTEMIS-5498 Scan cluster topology to detect wrong discovery entries
---
.../api/core/DiscoveryGroupConfiguration.java | 10 +++++
.../artemis/api/core/client/ActiveMQClient.java | 2 +
.../core/client/impl/ServerLocatorImpl.java | 2 +-
.../artemis/core/cluster/DiscoveryGroup.java | 6 ++-
.../artemis/core/server/ActiveMQServerLogger.java | 2 +-
.../server/cluster/impl/ClusterConnectionImpl.java | 13 +++---
.../routing/pools/DiscoveryGroupService.java | 2 +-
.../SymmetricClusterWithDiscoveryTest.java | 48 ++++++++++++++++++++++
.../integration/discovery/DiscoveryBaseTest.java | 3 +-
.../tests/integration/discovery/DiscoveryTest.java | 3 +-
10 files changed, 79 insertions(+), 12 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/DiscoveryGroupConfiguration.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/DiscoveryGroupConfiguration.java
index 17bf9bb73b..6fdbd05a40 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/DiscoveryGroupConfiguration.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/DiscoveryGroupConfiguration.java
@@ -43,6 +43,8 @@ public final class DiscoveryGroupConfiguration implements
Serializable {
private long discoveryInitialWaitTimeout =
ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
+ private long stoppingTimeout =
ActiveMQClient.DEFAULT_DISCOVERY_STOPPING_TIMEOUT;
+
// This is the actual object used by the class, it has to be transient so
we can handle deserialization with a 2.2 client
private BroadcastEndpointFactory endpointFactory;
@@ -76,6 +78,14 @@ public final class DiscoveryGroupConfiguration implements
Serializable {
return this;
}
+ public long getStoppingTimeout() {
+ return stoppingTimeout;
+ }
+
+ public void setStoppingTimeout(long stoppingTimeout) {
+ this.stoppingTimeout = stoppingTimeout;
+ }
+
public BroadcastEndpointFactory getBroadcastEndpointFactory() {
return endpointFactory;
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
index 66e4e9fec2..2954942cc4 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
@@ -105,6 +105,8 @@ public final class ActiveMQClient {
public static final long DEFAULT_DISCOVERY_REFRESH_TIMEOUT = 10000;
+ public static final long DEFAULT_DISCOVERY_STOPPING_TIMEOUT = 10000;
+
public static final int DEFAULT_DISCOVERY_PORT = 9876;
public static final long DEFAULT_RETRY_INTERVAL = 2000;
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index 35c518ff4c..0c0960d3be 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -326,7 +326,7 @@ public final class ServerLocatorImpl implements
ServerLocatorInternal, Discovery
private static DiscoveryGroup createDiscoveryGroup(String nodeID,
DiscoveryGroupConfiguration config) throws Exception {
- return new DiscoveryGroup(nodeID, config.getName(),
config.getRefreshTimeout(), config.getBroadcastEndpointFactory(), null);
+ return new DiscoveryGroup(nodeID, config.getName(),
config.getRefreshTimeout(), config.getStoppingTimeout(),
config.getBroadcastEndpointFactory(), null);
}
private ServerLocatorImpl(final Topology topology,
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
index 2a8201e867..3782404c52 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
@@ -80,17 +80,21 @@ public final class DiscoveryGroup implements
ActiveMQComponent {
private final NotificationService notificationService;
+ private final long stoppingTimeout;
+
/**
* This is the main constructor, intended to be used
*/
public DiscoveryGroup(final String nodeID,
final String name,
final long timeout,
+ final long stoppingTimeout,
BroadcastEndpointFactory endpointFactory,
NotificationService service) throws Exception {
this.nodeID = nodeID;
this.name = name;
this.timeout = timeout;
+ this.stoppingTimeout = stoppingTimeout;
this.endpoint = endpointFactory.createBroadcastEndpoint();
this.notificationService = service;
}
@@ -170,7 +174,7 @@ public final class DiscoveryGroup implements
ActiveMQComponent {
try {
if (thread != null) {
thread.interrupt();
- thread.join(10000);
+ thread.join(stoppingTimeout);
if (thread.isAlive()) {
ActiveMQClientLogger.LOGGER.timedOutStoppingDiscovery();
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 3e1919482e..1c4bcca247 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1497,7 +1497,7 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 224143, value = "Bridge {} failed to send {}: {} {}",
level = LogMessage.Level.WARN)
void bridgeFailedToSend(String bridgeName, String message, String
exceptionName, String exceptionMessage);
- @LogMessage(id = 224144, value = "The topology of the cluster connection {}
doesn't include all th expected members. "
+ @LogMessage(id = 224144, value = "The topology of the cluster connection {}
doesn't include all the expected members. "
+ "Check the discovery group or the static connectors of the cluster
connection if the topology is correct: {} / {}", level = LogMessage.Level.WARN)
void incompleteClusterTopology(String clusterConnection, Topology topology,
String topologyMembers);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 7efaa628bf..5216c6221e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -625,18 +625,19 @@ public final class ClusterConnectionImpl implements
ClusterConnection, AfterConn
// localMember.getConnector().a,
// localMember.getConnector().b);
- if (topologyScanner != null && !stopping) {
- topologyScanner.start();
- topologyScanner.resetCounter();
- topologyScanner.delay();
- }
+ startTopologyScanner();
}
@Override
public void connectorsChanged(List<DiscoveryEntry> newConnectors) {
discoveryEntries = newConnectors;
- if (topologyScanner != null && !stopping && topologyScanner.isStarted())
{
+ startTopologyScanner();
+ }
+
+ private void startTopologyScanner() {
+ if (topologyScanner != null && !stopping) {
+ topologyScanner.start();
topologyScanner.resetCounter();
topologyScanner.delay();
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java
index 1e384c9027..b63d3b9024 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java
@@ -41,7 +41,7 @@ public class DiscoveryGroupService extends DiscoveryService
implements Discovery
@Override
public void start() throws Exception {
- discoveryGroup = new DiscoveryGroup(localTarget.getNodeID(),
config.getName(), config.getRefreshTimeout(),
config.getBroadcastEndpointFactory(), null);
+ discoveryGroup = new DiscoveryGroup(localTarget.getNodeID(),
config.getName(), config.getRefreshTimeout(), config.getStoppingTimeout(),
config.getBroadcastEndpointFactory(), null);
discoveryGroup.registerListener(this);
discoveryGroup.start();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
index 290c4461c2..bd5919d5e4 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
@@ -17,6 +17,8 @@
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
import java.io.EOFException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -24,8 +26,11 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.BroadcastEndpoint;
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.Test;
public class SymmetricClusterWithDiscoveryTest extends SymmetricClusterTest {
@@ -89,6 +94,49 @@ public class SymmetricClusterWithDiscoveryTest extends
SymmetricClusterTest {
doTestStartStopServers(false);
}
+ @Test
+ public void testStartStopServersWithWrongConnectorConfigurations() throws
Exception {
+ try (AssertionLoggerHandler loggerHandler = new
AssertionLoggerHandler()) {
+ setupCluster();
+
+ for (int node = 0; node < 5; node++) {
+ final int serverNode = node;
+
+ // Set wrong connector configurations
+ Map<String, TransportConfiguration> wrongConnectorConfigurations =
new HashMap<>();
+
getServer(node).getConfiguration().getConnectorConfigurations().forEach((key,
transportConfiguration) -> {
+ TransportConfiguration wrongtransportConfiguration = new
TransportConfiguration(
+ transportConfiguration.getFactoryClassName(),
+ new HashMap<>(transportConfiguration.getParams()),
+ transportConfiguration.getName(),
+ new HashMap<>(transportConfiguration.getExtraParams()));
+ if (isNetty()) {
+ wrongtransportConfiguration.getParams().put("port",
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT
+ serverNode + 10);
+ } else {
+ wrongtransportConfiguration.getParams().put("serverId",
String.valueOf(serverNode + 10));
+ }
+ wrongConnectorConfigurations.put(key,
wrongtransportConfiguration);
+ });
+
getServer(node).getConfiguration().setConnectorConfigurations(wrongConnectorConfigurations);
+
+ // Reduce the discovery stopping timeout to speed up the test
+
getServer(node).getConfiguration().getDiscoveryGroupConfigurations().forEach((s,
discoveryGroupConfiguration) ->
discoveryGroupConfiguration.setStoppingTimeout(1));
+
+ // Reduce the topology scanner attempts to speed up the test
+
getServer(node).getConfiguration().getClusterConfigurations().forEach(
+ clusterConnectionConfiguration ->
clusterConnectionConfiguration.setTopologyScannerAttempts(1));
+ }
+
+ startServers();
+
+ validateTopologySize(1, 0, 1, 2, 3, 4);
+
+ Wait.assertTrue(() -> loggerHandler.findText("AMQ224144"));
+
+ stopServers();
+ }
+ }
+
@Override
protected void setupProxy(int nodeId) {
getServer(nodeId).getConfiguration().getDiscoveryGroupConfigurations().get("dg1").
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java
index 54634f0640..2a18b03e03 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.cluster.DiscoveryEntry;
import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
import org.apache.activemq.artemis.core.cluster.DiscoveryListener;
@@ -153,7 +154,7 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
final int groupPort,
final long timeout,
NotificationService notif)
throws Exception {
- return new DiscoveryGroup(nodeID, name, timeout, new
UDPBroadcastEndpointFactory().setGroupAddress(groupAddress.getHostAddress()).setGroupPort(groupPort).setLocalBindAddress(localBindAddress
!= null ? localBindAddress.getHostAddress() : "localhost"), notif);
+ return new DiscoveryGroup(nodeID, name, timeout,
ActiveMQClient.DEFAULT_DISCOVERY_STOPPING_TIMEOUT, new
UDPBroadcastEndpointFactory().setGroupAddress(groupAddress.getHostAddress()).setGroupPort(groupPort).setLocalBindAddress(localBindAddress
!= null ? localBindAddress.getHostAddress() : "localhost"), notif);
}
protected final class FakeNodeManager extends NodeManager {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java
index 5041b6467e..447a3808b2 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java
@@ -36,6 +36,7 @@ import
org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import
org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.core.cluster.DiscoveryEntry;
@@ -134,7 +135,7 @@ public class DiscoveryTest extends DiscoveryBaseTest {
bg.addConnector(live1);
- dg = new DiscoveryGroup(nodeID + "1", "broadcast", 5000L, new
JGroupsFileBroadcastEndpointFactory().setChannelName("tst").setFile(TEST_JGROUPS_CONF_FILE),
null);
+ dg = new DiscoveryGroup(nodeID + "1", "broadcast", 5000L,
ActiveMQClient.DEFAULT_DISCOVERY_STOPPING_TIMEOUT, new
JGroupsFileBroadcastEndpointFactory().setChannelName("tst").setFile(TEST_JGROUPS_CONF_FILE),
null);
dg.start();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact