This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 169923a359 ARTEMIS-5498 Scan cluster topology to detect wrong 
discovery entries
169923a359 is described below

commit 169923a359a5b85912305673d750e893f2ac872c
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Wed May 28 10:46:03 2025 +0200

    ARTEMIS-5498 Scan cluster topology to detect wrong discovery entries
---
 .../api/core/DiscoveryGroupConfiguration.java      | 10 +++++
 .../artemis/api/core/client/ActiveMQClient.java    |  2 +
 .../core/client/impl/ServerLocatorImpl.java        |  2 +-
 .../artemis/core/cluster/DiscoveryGroup.java       |  6 ++-
 .../artemis/core/server/ActiveMQServerLogger.java  |  2 +-
 .../server/cluster/impl/ClusterConnectionImpl.java | 13 +++---
 .../routing/pools/DiscoveryGroupService.java       |  2 +-
 .../SymmetricClusterWithDiscoveryTest.java         | 48 ++++++++++++++++++++++
 .../integration/discovery/DiscoveryBaseTest.java   |  3 +-
 .../tests/integration/discovery/DiscoveryTest.java |  3 +-
 10 files changed, 79 insertions(+), 12 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/DiscoveryGroupConfiguration.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/DiscoveryGroupConfiguration.java
index 17bf9bb73b..6fdbd05a40 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/DiscoveryGroupConfiguration.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/DiscoveryGroupConfiguration.java
@@ -43,6 +43,8 @@ public final class DiscoveryGroupConfiguration implements 
Serializable {
 
    private long discoveryInitialWaitTimeout = 
ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
 
+   private long stoppingTimeout = 
ActiveMQClient.DEFAULT_DISCOVERY_STOPPING_TIMEOUT;
+
    // This is the actual object used by the class, it has to be transient so 
we can handle deserialization with a 2.2 client
    private BroadcastEndpointFactory endpointFactory;
 
@@ -76,6 +78,14 @@ public final class DiscoveryGroupConfiguration implements 
Serializable {
       return this;
    }
 
+   public long getStoppingTimeout() {
+      return stoppingTimeout;
+   }
+
+   public void setStoppingTimeout(long stoppingTimeout) {
+      this.stoppingTimeout = stoppingTimeout;
+   }
+
    public BroadcastEndpointFactory getBroadcastEndpointFactory() {
       return endpointFactory;
    }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
index 66e4e9fec2..2954942cc4 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
@@ -105,6 +105,8 @@ public final class ActiveMQClient {
 
    public static final long DEFAULT_DISCOVERY_REFRESH_TIMEOUT = 10000;
 
+   public static final long DEFAULT_DISCOVERY_STOPPING_TIMEOUT = 10000;
+
    public static final int DEFAULT_DISCOVERY_PORT = 9876;
 
    public static final long DEFAULT_RETRY_INTERVAL = 2000;
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index 35c518ff4c..0c0960d3be 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -326,7 +326,7 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
 
    private static DiscoveryGroup createDiscoveryGroup(String nodeID,
                                                       
DiscoveryGroupConfiguration config) throws Exception {
-      return new DiscoveryGroup(nodeID, config.getName(), 
config.getRefreshTimeout(), config.getBroadcastEndpointFactory(), null);
+      return new DiscoveryGroup(nodeID, config.getName(), 
config.getRefreshTimeout(), config.getStoppingTimeout(), 
config.getBroadcastEndpointFactory(), null);
    }
 
    private ServerLocatorImpl(final Topology topology,
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
index 2a8201e867..3782404c52 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
@@ -80,17 +80,21 @@ public final class DiscoveryGroup implements 
ActiveMQComponent {
 
    private final NotificationService notificationService;
 
+   private final long stoppingTimeout;
+
    /**
     * This is the main constructor, intended to be used
     */
    public DiscoveryGroup(final String nodeID,
                          final String name,
                          final long timeout,
+                         final long stoppingTimeout,
                          BroadcastEndpointFactory endpointFactory,
                          NotificationService service) throws Exception {
       this.nodeID = nodeID;
       this.name = name;
       this.timeout = timeout;
+      this.stoppingTimeout = stoppingTimeout;
       this.endpoint = endpointFactory.createBroadcastEndpoint();
       this.notificationService = service;
    }
@@ -170,7 +174,7 @@ public final class DiscoveryGroup implements 
ActiveMQComponent {
       try {
          if (thread != null) {
             thread.interrupt();
-            thread.join(10000);
+            thread.join(stoppingTimeout);
             if (thread.isAlive()) {
                ActiveMQClientLogger.LOGGER.timedOutStoppingDiscovery();
             }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 3e1919482e..1c4bcca247 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1497,7 +1497,7 @@ public interface ActiveMQServerLogger {
    @LogMessage(id = 224143, value = "Bridge {} failed to send {}: {} {}", 
level = LogMessage.Level.WARN)
    void bridgeFailedToSend(String bridgeName, String message, String 
exceptionName, String exceptionMessage);
 
-   @LogMessage(id = 224144, value = "The topology of the cluster connection {} 
doesn't include all th expected members. "
+   @LogMessage(id = 224144, value = "The topology of the cluster connection {} 
doesn't include all the expected members. "
        + "Check the discovery group or the static connectors of the cluster 
connection if the topology is correct: {} / {}", level = LogMessage.Level.WARN)
    void incompleteClusterTopology(String clusterConnection, Topology topology, 
String topologyMembers);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 7efaa628bf..5216c6221e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -625,18 +625,19 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
       // localMember.getConnector().a,
       // localMember.getConnector().b);
 
-      if (topologyScanner != null && !stopping) {
-         topologyScanner.start();
-         topologyScanner.resetCounter();
-         topologyScanner.delay();
-      }
+      startTopologyScanner();
    }
 
    @Override
    public void connectorsChanged(List<DiscoveryEntry> newConnectors) {
       discoveryEntries = newConnectors;
 
-      if (topologyScanner != null && !stopping && topologyScanner.isStarted()) 
{
+      startTopologyScanner();
+   }
+
+   private void startTopologyScanner() {
+      if (topologyScanner != null && !stopping) {
+         topologyScanner.start();
          topologyScanner.resetCounter();
          topologyScanner.delay();
       }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java
index 1e384c9027..b63d3b9024 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/pools/DiscoveryGroupService.java
@@ -41,7 +41,7 @@ public class DiscoveryGroupService extends DiscoveryService 
implements Discovery
 
    @Override
    public void start() throws Exception {
-      discoveryGroup = new DiscoveryGroup(localTarget.getNodeID(), 
config.getName(), config.getRefreshTimeout(), 
config.getBroadcastEndpointFactory(), null);
+      discoveryGroup = new DiscoveryGroup(localTarget.getNodeID(), 
config.getName(), config.getRefreshTimeout(), config.getStoppingTimeout(), 
config.getBroadcastEndpointFactory(), null);
       discoveryGroup.registerListener(this);
 
       discoveryGroup.start();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
index 290c4461c2..bd5919d5e4 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
@@ -17,6 +17,8 @@
 package org.apache.activemq.artemis.tests.integration.cluster.distribution;
 
 import java.io.EOFException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -24,8 +26,11 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.BroadcastEndpoint;
 import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.jupiter.api.Test;
 
 public class SymmetricClusterWithDiscoveryTest extends SymmetricClusterTest {
@@ -89,6 +94,49 @@ public class SymmetricClusterWithDiscoveryTest extends 
SymmetricClusterTest {
       doTestStartStopServers(false);
    }
 
+   @Test
+   public void testStartStopServersWithWrongConnectorConfigurations() throws 
Exception {
+      try (AssertionLoggerHandler loggerHandler = new 
AssertionLoggerHandler()) {
+         setupCluster();
+
+         for (int node = 0; node < 5; node++) {
+            final int serverNode = node;
+
+            // Set wrong connector configurations
+            Map<String, TransportConfiguration> wrongConnectorConfigurations = 
new HashMap<>();
+            
getServer(node).getConfiguration().getConnectorConfigurations().forEach((key, 
transportConfiguration) -> {
+               TransportConfiguration wrongtransportConfiguration = new 
TransportConfiguration(
+                       transportConfiguration.getFactoryClassName(),
+                       new HashMap<>(transportConfiguration.getParams()),
+                       transportConfiguration.getName(),
+                       new HashMap<>(transportConfiguration.getExtraParams()));
+               if (isNetty()) {
+                  wrongtransportConfiguration.getParams().put("port", 
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT
 + serverNode + 10);
+               } else {
+                  wrongtransportConfiguration.getParams().put("serverId", 
String.valueOf(serverNode + 10));
+               }
+               wrongConnectorConfigurations.put(key, 
wrongtransportConfiguration);
+            });
+            
getServer(node).getConfiguration().setConnectorConfigurations(wrongConnectorConfigurations);
+
+            // Reduce the discovery stopping timeout to speed up the test
+            
getServer(node).getConfiguration().getDiscoveryGroupConfigurations().forEach((s,
 discoveryGroupConfiguration) -> 
discoveryGroupConfiguration.setStoppingTimeout(1));
+
+            // Reduce the topology scanner attempts to speed up the test
+            
getServer(node).getConfiguration().getClusterConfigurations().forEach(
+                    clusterConnectionConfiguration -> 
clusterConnectionConfiguration.setTopologyScannerAttempts(1));
+         }
+
+         startServers();
+
+         validateTopologySize(1, 0, 1, 2, 3, 4);
+
+         Wait.assertTrue(() -> loggerHandler.findText("AMQ224144"));
+
+         stopServers();
+      }
+   }
+
    @Override
    protected void setupProxy(int nodeId) {
       
getServer(nodeId).getConfiguration().getDiscoveryGroupConfigurations().get("dg1").
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java
index 54634f0640..2a18b03e03 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.cluster.DiscoveryEntry;
 import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
 import org.apache.activemq.artemis.core.cluster.DiscoveryListener;
@@ -153,7 +154,7 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
                                               final int groupPort,
                                               final long timeout,
                                               NotificationService notif) 
throws Exception {
-      return new DiscoveryGroup(nodeID, name, timeout, new 
UDPBroadcastEndpointFactory().setGroupAddress(groupAddress.getHostAddress()).setGroupPort(groupPort).setLocalBindAddress(localBindAddress
 != null ? localBindAddress.getHostAddress() : "localhost"), notif);
+      return new DiscoveryGroup(nodeID, name, timeout, 
ActiveMQClient.DEFAULT_DISCOVERY_STOPPING_TIMEOUT, new 
UDPBroadcastEndpointFactory().setGroupAddress(groupAddress.getHostAddress()).setGroupPort(groupPort).setLocalBindAddress(localBindAddress
 != null ? localBindAddress.getHostAddress() : "localhost"), notif);
    }
 
    protected final class FakeNodeManager extends NodeManager {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java
index 5041b6467e..447a3808b2 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java
@@ -36,6 +36,7 @@ import 
org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
 import 
org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.core.cluster.DiscoveryEntry;
@@ -134,7 +135,7 @@ public class DiscoveryTest extends DiscoveryBaseTest {
 
       bg.addConnector(live1);
 
-      dg = new DiscoveryGroup(nodeID + "1", "broadcast", 5000L, new 
JGroupsFileBroadcastEndpointFactory().setChannelName("tst").setFile(TEST_JGROUPS_CONF_FILE),
 null);
+      dg = new DiscoveryGroup(nodeID + "1", "broadcast", 5000L, 
ActiveMQClient.DEFAULT_DISCOVERY_STOPPING_TIMEOUT, new 
JGroupsFileBroadcastEndpointFactory().setChannelName("tst").setFile(TEST_JGROUPS_CONF_FILE),
 null);
 
       dg.start();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to