This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new ae5208c543 ARTEMIS-5382 Scan cluster topology to merge subgroups
ae5208c543 is described below

commit ae5208c54301375d723999c16ab513cd9b897668
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Thu Apr 3 18:25:17 2025 +0200

    ARTEMIS-5382 Scan cluster topology to merge subgroups
    
    After the first cluster connection a broker only connects to the other 
brokers
    in its topology. Temporary network issues can cause cluster subgroups. In 
case
    of cluster subgroups the topology only contains the broker of its subgroup. 
So
    the cluster subgroups will remain separate.
---
 .../api/config/ActiveMQDefaultConfiguration.java   |  14 +-
 .../core/client/impl/ServerLocatorImpl.java        |  23 ++-
 .../core/client/impl/ServerLocatorInternal.java    |   3 +
 .../config/ClusterConnectionConfiguration.java     |  16 ++
 .../deployers/impl/FileConfigurationParser.java    |   4 +-
 .../artemis/core/server/ActiveMQServerLogger.java  |   5 +
 .../core/server/cluster/ClusterManager.java        |   4 +-
 .../server/cluster/impl/ClusterConnectionImpl.java | 196 +++++++++++++++++++-
 .../resources/schema/artemis-configuration.xsd     |   8 +
 .../core/config/impl/FileConfigurationTest.java    |   3 +
 .../impl/ClusterConnectionImplMockTest.java        |  31 +++-
 .../resources/ConfigurationTest-full-config.xml    |   2 +
 .../ConfigurationTest-xinclude-config.xml          |   2 +
 ...-xinclude-schema-config-cluster-connections.xml |   2 +
 .../cluster/distribution/SymmetricClusterTest.java | 201 ++++++++++++++++++++-
 .../SymmetricClusterWithBackupTest.java            |  10 +
 .../SymmetricClusterWithDiscoveryTest.java         | 129 ++++++++++++-
 17 files changed, 627 insertions(+), 26 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 9546ab2f95..fef52adb16 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -703,14 +703,18 @@ public final class ActiveMQDefaultConfiguration {
    // These properties used to defined with this prefix.
    // I'm keeping the older property name in an attempt to guarantee 
compatibility
    private static final String FORMER_ACK_RETRY_CLASS_NAME = 
"org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckRetry";
-   private static final int DEFAULT_MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS = 
Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + 
".MIN_QUEUE_ATTEMPTS", "5"));;
-   private static final int DEFAULT_MIRROR_ACK_MANAGER_PAGE_ATTEMPTS = 
Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + 
".MAX_PAGE_ATTEMPT", "2"));;
+   private static final int DEFAULT_MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS = 
Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + 
".MIN_QUEUE_ATTEMPTS", "5"));
+   private static final int DEFAULT_MIRROR_ACK_MANAGER_PAGE_ATTEMPTS = 
Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + 
".MAX_PAGE_ATTEMPT", "2"));
 
-   private static final int DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY = 
Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + 
".RETRY_DELAY", "100"));;
+   private static final int DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY = 
Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME + 
".RETRY_DELAY", "100"));
 
    private static final boolean DEFAULT_MIRROR_ACK_MANAGER_WARN_UNACKED = 
false;
    private static final boolean DEFAULT_MIRROR_PAGE_TRANSACTION = false;
 
+
+   private static final int DEFAULT_CLUSTER_TOPOLOGY_SCANNER_ATTEMPTS = 30;
+
+
    /**
     * If {@code true} then the ActiveMQ Artemis Server will make use of any 
Protocol Managers that are in available on
     * the classpath. If false then only the core protocol will be available, 
unless in Embedded mode where users can
@@ -1997,4 +2001,8 @@ public final class ActiveMQDefaultConfiguration {
    public static int getInitialQueueBufferSize() {
       return INITIAL_QUEUE_BUFFER_SIZE;
    }
+
+   public static int getClusterTopologyScannerAttempts() {
+      return DEFAULT_CLUSTER_TOPOLOGY_SCANNER_ATTEMPTS;
+   }
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index f1fe47a867..35c518ff4c 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -187,6 +187,8 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
 
    private String passwordCodec;
 
+   private DiscoveryListener discoveryListener;
+
    public static synchronized void clearThreadPools() {
       ActiveMQClient.clearThreadPools();
    }
@@ -440,6 +442,7 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
       groupID = locator.groupID;
       nodeID = locator.nodeID;
       clusterTransportConfiguration = locator.clusterTransportConfiguration;
+      discoveryListener = locator.discoveryListener;
    }
 
    private boolean useInitConnector() {
@@ -586,6 +589,12 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
       return afterConnectListener;
    }
 
+   @Override
+   public ServerLocatorImpl setDiscoveryListener(DiscoveryListener 
discoveryListener) {
+      this.discoveryListener = discoveryListener;
+      return this;
+   }
+
    @Override
    public ClientSessionFactory createSessionFactory(String nodeID) throws 
Exception {
       TopologyMember topologyMember = topology.getMember(nodeID);
@@ -1624,18 +1633,18 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
 
    @Override
    public synchronized void connectorsChanged(List<DiscoveryEntry> 
newConnectors) {
+      DiscoveryListener discoveryListener = this.discoveryListener;
+
       if (receivedTopology) {
+         if (discoveryListener != null) {
+            discoveryListener.connectorsChanged(newConnectors);
+         }
          return;
       }
 
       final List<TransportConfiguration> newInitialconnectors = new 
ArrayList<>(newConnectors.size());
 
       for (DiscoveryEntry entry : newConnectors) {
-         if (ha && topology.getMember(entry.getNodeID()) == null) {
-            TopologyMemberImpl member = new 
TopologyMemberImpl(entry.getNodeID(), null, null, entry.getConnector(), null);
-            // on this case we set it as zero as any update coming from server 
should be accepted
-            topology.updateMember(0, entry.getNodeID(), member);
-         }
          // ignore its own transport connector
          if (!entry.getConnector().equals(clusterTransportConfiguration)) {
             newInitialconnectors.add(entry.getConnector());
@@ -1661,6 +1670,10 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
             connectRunnable.run();
          }
       }
+
+      if (discoveryListener != null) {
+         discoveryListener.connectorsChanged(newConnectors);
+      }
    }
 
    @Override
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java
index b39cfd58ae..4338d7a9b1 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.cluster.DiscoveryListener;
 import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
 
 public interface ServerLocatorInternal extends ServerLocator {
@@ -35,6 +36,8 @@ public interface ServerLocatorInternal extends ServerLocator {
 
    ServerLocatorInternal 
setAfterConnectionInternalListener(AfterConnectInternalListener listener);
 
+   ServerLocatorInternal setDiscoveryListener(DiscoveryListener listener);
+
    /**
     * Used to better identify Cluster Connection Locators on logs. To 
facilitate eventual debugging.
     * <p>
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java
index 5cbbb13cb9..5b9d47dc3f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java
@@ -85,6 +85,8 @@ public final class ClusterConnectionConfiguration implements 
Serializable {
 
    private String clientId;
 
+   private int topologyScannerAttempts = 
ActiveMQDefaultConfiguration.getClusterTopologyScannerAttempts();
+
    public ClusterConnectionConfiguration() {
    }
 
@@ -318,6 +320,15 @@ public final class ClusterConnectionConfiguration 
implements Serializable {
       return this;
    }
 
+   public int getTopologyScannerAttempts() {
+      return topologyScannerAttempts;
+   }
+
+   public ClusterConnectionConfiguration setTopologyScannerAttempts(int 
topologyScannerAttempts) {
+      this.topologyScannerAttempts = topologyScannerAttempts;
+      return this;
+   }
+
    /**
     * This method will match the configuration and return the proper 
TransportConfiguration for the Configuration
     */
@@ -402,6 +413,7 @@ public final class ClusterConnectionConfiguration 
implements Serializable {
       result = prime * result + (int) (temp ^ (temp >>> 32));
       result = prime * result + ((staticConnectors == null) ? 0 : 
staticConnectors.hashCode());
       result = prime * result + ((clientId == null) ? 0 : clientId.hashCode());
+      result = prime * result + topologyScannerAttempts;
       return result;
    }
 
@@ -510,6 +522,9 @@ public final class ClusterConnectionConfiguration 
implements Serializable {
       } else if (!clientId.equals(other.clientId)) {
          return false;
       }
+      if (topologyScannerAttempts != other.topologyScannerAttempts) {
+         return false;
+      }
       return true;
    }
 
@@ -540,6 +555,7 @@ public final class ClusterConnectionConfiguration 
implements Serializable {
          ", clusterNotificationInterval=" + clusterNotificationInterval +
          ", clusterNotificationAttempts=" + clusterNotificationAttempts +
          ", clientId=" + clientId +
+         ", topologyScannerInterval=" + topologyScannerAttempts +
          '}';
    }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 31c7b40d6b..7d6839fffe 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -2360,6 +2360,8 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
       int clusterNotificationAttempts = getInteger(e, "notification-attempts", 
ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), GT_ZERO);
 
+      int topologyScannerAttempts = getInteger(e, "topology-scanner-attempts", 
ActiveMQDefaultConfiguration.getClusterTopologyScannerAttempts(), 
MINUS_ONE_OR_POSITIVE_INT);
+
       String discoveryGroupName = null;
 
       List<String> staticConnectorNames = new ArrayList<>();
@@ -2384,7 +2386,7 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
       String clientId = getString(e, "client-id", null, NO_CHECK);
 
-      ClusterConnectionConfiguration config = new 
ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorName).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).s
 [...]
+      ClusterConnectionConfiguration config = new 
ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorName).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).s
 [...]
 
       if (discoveryGroupName == null) {
          config.setStaticConnectors(staticConnectorNames);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 52767b92d9..29c92a329c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
+import org.apache.activemq.artemis.core.client.impl.Topology;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
@@ -1495,4 +1496,8 @@ public interface ActiveMQServerLogger {
 
    @LogMessage(id = 224143, value = "Bridge {} failed to send {}: {} {}", 
level = LogMessage.Level.WARN)
    void bridgeFailedToSend(String bridgeName, String message, String 
exceptionName, String exceptionMessage);
+
+   @LogMessage(id = 224144, value = "The topology of the cluster connection {} 
doesn't include all th expected members. "
+       + "Check the discovery group or the static connectors of the cluster 
connection if the topology is correct: {} / {}", level = LogMessage.Level.WARN)
+   void incompleteClusterTopology(String clusterConnection, Topology topology, 
String topologyMembers);
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
index 4d86c25c2f..f1790300fb 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
@@ -616,7 +616,7 @@ public class ClusterManager implements ActiveMQComponent {
             logger.debug("{} Starting a Discovery Group Cluster Connection, 
name={}, dg={}", this, config.getDiscoveryGroupName(), dg);
          }
 
-         clusterConnection = new ClusterConnectionImpl(this, dg, connector, 
SimpleString.of(config.getName()), SimpleString.of(config.getAddress() != null 
? config.getAddress() : ""), config.getMinLargeMessageSize(), 
config.getClientFailureCheckPeriod(), config.getConnectionTTL(), 
config.getRetryInterval(), config.getRetryIntervalMultiplier(), 
config.getMaxRetryInterval(), config.getInitialConnectAttempts(), 
config.getReconnectAttempts(), config.getCallTimeout(), 
config.getCallFailoverTi [...]
+         clusterConnection = new ClusterConnectionImpl(this, dg, connector, 
SimpleString.of(config.getName()), SimpleString.of(config.getAddress() != null 
? config.getAddress() : ""), config.getMinLargeMessageSize(), 
config.getClientFailureCheckPeriod(), config.getConnectionTTL(), 
config.getRetryInterval(), config.getRetryIntervalMultiplier(), 
config.getMaxRetryInterval(), config.getInitialConnectAttempts(), 
config.getReconnectAttempts(), config.getCallTimeout(), 
config.getCallFailoverTi [...]
 
          clusterController.addClusterConnection(clusterConnection.getName(), 
dg, config, connector);
       } else {
@@ -626,7 +626,7 @@ public class ClusterManager implements ActiveMQComponent {
             logger.debug("{} defining cluster connection towards {}", this, 
Arrays.toString(tcConfigs));
          }
 
-         clusterConnection = new ClusterConnectionImpl(this, tcConfigs, 
connector, SimpleString.of(config.getName()), 
SimpleString.of(config.getAddress()), config.getMinLargeMessageSize(), 
config.getClientFailureCheckPeriod(), config.getConnectionTTL(), 
config.getRetryInterval(), config.getRetryIntervalMultiplier(), 
config.getMaxRetryInterval(), config.getInitialConnectAttempts(), 
config.getReconnectAttempts(), config.getCallTimeout(), 
config.getCallFailoverTimeout(), config.isDuplicateD [...]
+         clusterConnection = new ClusterConnectionImpl(this, tcConfigs, 
connector, SimpleString.of(config.getName()), 
SimpleString.of(config.getAddress()), config.getMinLargeMessageSize(), 
config.getClientFailureCheckPeriod(), config.getConnectionTTL(), 
config.getRetryInterval(), config.getRetryIntervalMultiplier(), 
config.getMaxRetryInterval(), config.getInitialConnectAttempts(), 
config.getReconnectAttempts(), config.getCallTimeout(), 
config.getCallFailoverTimeout(), config.isDuplicateD [...]
 
          clusterController.addClusterConnection(clusterConnection.getName(), 
tcConfigs, config, connector);
       }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 9ba6b838fe..7efaa628bf 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -18,11 +18,15 @@ package 
org.apache.activemq.artemis.core.server.cluster.impl;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
@@ -48,12 +52,15 @@ import 
org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.client.impl.Topology;
 import org.apache.activemq.artemis.core.client.impl.TopologyManager;
 import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
+import org.apache.activemq.artemis.core.cluster.DiscoveryEntry;
+import org.apache.activemq.artemis.core.cluster.DiscoveryListener;
 import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.NodeManager;
@@ -78,8 +85,9 @@ import 
org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
+import java.util.concurrent.atomic.AtomicInteger;
 
-public final class ClusterConnectionImpl implements ClusterConnection, 
AfterConnectInternalListener, TopologyManager {
+public final class ClusterConnectionImpl implements ClusterConnection, 
AfterConnectInternalListener, TopologyManager, DiscoveryListener {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -183,6 +191,12 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
 
    private final String clientId;
 
+   private volatile List<DiscoveryEntry> discoveryEntries;
+
+   private final TransportConfiguration[] staticTransportConfigurations;
+
+   private final TopologyScanner topologyScanner;
+
    /**
     * For tests only
     */
@@ -222,7 +236,8 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
                                 final boolean allowDirectConnectionsOnly,
                                 final long clusterNotificationInterval,
                                 final int clusterNotificationAttempts,
-                                final String clientId) throws Exception {
+                                final String clientId,
+                                final int topologyScannerAttempts) throws 
Exception {
       this.nodeManager = nodeManager;
 
       this.connector = connector;
@@ -307,6 +322,11 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
       this.storeAndForwardPrefix = server.getInternalNamingPrefix() + 
SN_PREFIX;
 
       this.clientId = clientId;
+
+      this.staticTransportConfigurations = staticTranspConfigs;
+
+      this.topologyScanner = topologyScannerAttempts == 0 ? null : new 
TopologyScanner(
+          scheduledExecutor, executor, topologyScannerAttempts);
    }
 
    public ClusterConnectionImpl(final ClusterManager manager,
@@ -340,7 +360,8 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
                                 final boolean allowDirectConnectionsOnly,
                                 final long clusterNotificationInterval,
                                 final int clusterNotificationAttempts,
-                                final String clientId) throws Exception {
+                                final String clientId,
+                                final int topologyScannerAttempts) throws 
Exception {
       this.nodeManager = nodeManager;
 
       this.connector = connector;
@@ -410,6 +431,11 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
       this.storeAndForwardPrefix = server.getInternalNamingPrefix() + 
SN_PREFIX;
 
       this.clientId = clientId;
+
+      this.staticTransportConfigurations = null;
+
+      this.topologyScanner = topologyScannerAttempts == 0 ? null : new 
TopologyScanner(
+          scheduledExecutor, executor, topologyScannerAttempts);
    }
 
    @Override
@@ -446,6 +472,11 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
 
       if (serverLocator != null) {
          serverLocator.removeClusterTopologyListener(this);
+         serverLocator.setDiscoveryListener(null);
+      }
+
+      if (topologyScanner != null) {
+         topologyScanner.stop();
       }
 
       if (logger.isDebugEnabled()) {
@@ -593,6 +624,164 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
       // false,
       // localMember.getConnector().a,
       // localMember.getConnector().b);
+
+      if (topologyScanner != null && !stopping) {
+         topologyScanner.start();
+         topologyScanner.resetCounter();
+         topologyScanner.delay();
+      }
+   }
+
+   @Override
+   public void connectorsChanged(List<DiscoveryEntry> newConnectors) {
+      discoveryEntries = newConnectors;
+
+      if (topologyScanner != null && !stopping && topologyScanner.isStarted()) 
{
+         topologyScanner.resetCounter();
+         topologyScanner.delay();
+      }
+   }
+
+   public TopologyScanner getTopologyScanner() {
+      return topologyScanner;
+   }
+
+   public final class TopologyScanner extends ActiveMQScheduledComponent {
+
+      private final int attempts;
+
+      private final AtomicInteger counter = new AtomicInteger();
+
+      private volatile boolean running = false;
+
+      public boolean isRunning() {
+         return running;
+      }
+
+      TopologyScanner(ScheduledExecutorService scheduledExecutorService, 
Executor executor, int attempts) {
+         super(scheduledExecutorService, executor, retryInterval, 
TimeUnit.MILLISECONDS, true);
+         this.attempts = attempts;
+      }
+
+      @Override
+      public boolean delay() {
+         running = true;
+
+         return super.delay();
+      }
+
+      public void resetCounter() {
+         counter.set(0);
+      }
+
+      @Override
+      public void run() {
+         TransportConfiguration[] transportConfigurations = null;
+
+         if (staticTransportConfigurations != null) {
+            transportConfigurations = staticTransportConfigurations;
+         } else {
+            List<DiscoveryEntry> discoveredEntries = discoveryEntries;
+
+            if (discoveredEntries != null) {
+               transportConfigurations = discoveryEntries.stream()
+                   .map(discoveryEntry -> discoveryEntry.getConnector())
+                   .toArray(TransportConfiguration[]::new);
+            } else {
+               logger.debug("No discovered entries");
+            }
+         }
+
+         if (transportConfigurations != null) {
+            boolean topologyUpdated = updateTopology(transportConfigurations);
+
+            int topologyScannerCount = counter.incrementAndGet();
+
+            boolean retry = (attempts == -1 || topologyScannerCount < 
attempts);
+
+            if (!topologyUpdated && !stopping && retry) {
+               delay();
+            } else {
+               running = false;
+
+               if (!topologyUpdated && !stopping) {
+                  
ActiveMQServerLogger.LOGGER.incompleteClusterTopology(name.toString(),
+                      topology, topology.getMembers().toString());
+               }
+            }
+         }
+      }
+
+      private boolean updateTopology(TransportConfiguration[] 
transportConfigurations) {
+         boolean result = true;
+
+         for (TransportConfiguration transportConfiguration : 
transportConfigurations) {
+            if (!topology.getMembers().stream().anyMatch(member -> 
compareTCs(connector, transportConfiguration) ||
+                member.getPrimary() != null && compareTCs(member.getPrimary(), 
transportConfiguration) ||
+                member.getBackup() != null && compareTCs(member.getBackup(), 
transportConfiguration))) {
+
+               try (ServerLocatorInternal targetLocator = new 
ServerLocatorImpl(topology, true, transportConfiguration)) {
+                  targetLocator.setReconnectAttempts(0);
+                  targetLocator.setInitialConnectAttempts(0);
+                  targetLocator.setConnectionTTL(connectionTTL);
+                  targetLocator.setCallTimeout(callTimeout);
+                  targetLocator.setNodeID(nodeManager.getNodeId().toString());
+                  targetLocator.setClusterTransportConfiguration(connector);
+                  targetLocator.setIdentity("(Cluster-topology-scanner::" + 
server.toString() + ")");
+
+                  try {
+                     try (ClientSessionFactoryInternal 
targetClientSessionFactory = targetLocator.connect()) {
+                        boolean waitForTopology = 
targetClientSessionFactory.waitForTopology(
+                            callTimeout, TimeUnit.MILLISECONDS);
+
+                        if (logger.isDebugEnabled()) {
+                           logger.debug("Cluster topology scanner 
waitForTopology from {}: {}",
+                               transportConfiguration, waitForTopology);
+                        }
+                     }
+                  } catch (ActiveMQException e) {
+                     result = false;
+
+                     if (logger.isDebugEnabled()) {
+                        logger.debug("Cluster topology scanner failed to 
connect to {}: {}",
+                            transportConfiguration, e);
+                     }
+                  }
+               }
+            }
+         }
+
+         return result;
+      }
+
+      protected static boolean compareTCs(TransportConfiguration config, 
TransportConfiguration otherConfig) {
+         if (config.getFactoryClassName().contains("Netty") && 
otherConfig.getFactoryClassName().contains("Netty")) {
+            return Objects.equals(config.getParams().get("port"), 
otherConfig.getParams().get("port")) &&
+                compareHosts((String)config.getParams().get("host"), 
(String)otherConfig.getParams().get("host"));
+         } else if (config.getFactoryClassName().contains("InVM") && 
otherConfig.getFactoryClassName().contains("InVM")) {
+            return Objects.equals(config.getParams().get("serverId"), 
otherConfig.getParams().get("serverId"));
+         }
+
+         return false;
+      }
+
+      private static boolean compareHosts(String host, String otherHost) {
+         if (Objects.equals(host, otherHost)) {
+            return true;
+         }
+
+         if (host != null && otherHost != null) {
+            try {
+               InetAddress hostAddr = InetAddress.getByName(host);
+               InetAddress otherHostAddr = InetAddress.getByName(otherHost);
+               return hostAddr.equals(otherHostAddr);
+            } catch (UnknownHostException e) {
+               logger.debug("Error resolving hosts: {}", e);
+            }
+         }
+
+         return false;
+      }
    }
 
    @Override
@@ -704,6 +893,7 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
          }
 
          serverLocator.setAfterConnectionInternalListener(this);
+         serverLocator.setDiscoveryListener(this);
 
          
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator,
 server.getStorageManager()));
 
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 1b3698788b..2329f4ad3e 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2630,6 +2630,14 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="topology-scanner-attempts" type="xsd:int" 
default="-1" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  maximum number of topology scanner attempts, -1 means 'no 
limits'
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element ref="discovery-type" maxOccurs="1" minOccurs="0"/>
 
       </xsd:all>
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index ec7a6031c6..799a364112 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -464,6 +464,7 @@ public class FileConfigurationTest extends 
AbstractConfigurationTestBase {
             
assertEquals(ActiveMQDefaultConfiguration.getDefaultClusterCallTimeout(), 
ccc.getCallTimeout());
             
assertEquals(ActiveMQDefaultConfiguration.getDefaultClusterCallFailoverTimeout(),
 ccc.getCallFailoverTimeout());
             assertEquals("myClientID", ccc.getClientId());
+            
assertEquals(ActiveMQDefaultConfiguration.getClusterTopologyScannerAttempts(), 
ccc.getTopologyScannerAttempts());
          } else if (ccc.getName().equals("cluster-connection1")) {
             assertEquals("cluster-connection1", ccc.getName());
             assertEquals(321, ccc.getMinLargeMessageSize(), 
"clusterConnectionConf minLargeMessageSize");
@@ -484,6 +485,7 @@ public class FileConfigurationTest extends 
AbstractConfigurationTestBase {
             assertNull(ccc.getDiscoveryGroupName());
             assertEquals(222, ccc.getProducerWindowSize());
             assertTrue(ccc.isAllowDirectConnectionsOnly());
+            assertEquals(-1, ccc.getTopologyScannerAttempts());
          } else {
             assertEquals("cluster-connection2", ccc.getName());
             assertEquals("queues2", ccc.getAddress());
@@ -497,6 +499,7 @@ public class FileConfigurationTest extends 
AbstractConfigurationTestBase {
             assertEquals("dg1", ccc.getDiscoveryGroupName());
             assertEquals(333, ccc.getProducerWindowSize());
             assertFalse(ccc.isAllowDirectConnectionsOnly());
+            assertEquals(30, ccc.getTopologyScannerAttempts());
          }
       }
 
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImplMockTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImplMockTest.java
index ca90c737a3..d6501e5dbb 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImplMockTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImplMockTest.java
@@ -19,13 +19,16 @@ package 
org.apache.activemq.artemis.core.server.cluster.impl;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.Map;
 import java.util.concurrent.Executors;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
+import 
org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActivateCallback;
 import org.apache.activemq.artemis.core.server.NodeManager;
@@ -87,7 +90,8 @@ public class ClusterConnectionImplMockTest extends 
ServerTestBase {
                 true, //final boolean allowDirectConnectionsOnly,
                 0, //final long clusterNotificationInterval,
                 0, //final int clusterNotificationAttempts)
-                null
+                null,
+                0
       );
 
       assertEquals(1, cci.allowableConnections.size());
@@ -106,7 +110,7 @@ public class ClusterConnectionImplMockTest extends 
ServerTestBase {
       ArtemisExecutor executor = 
ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName())));
 
       try {
-         ClusterConnectionImpl cci = new ClusterConnectionImpl(null, new 
TransportConfiguration[]{tc}, null, null, null, 0, 0L, 0L, 0L, 0, 0L, 0, 0, 0L, 
0L, false, null, 0, 0, () -> executor, new MockServer(), null, null, null, 0, 
new FakeNodeManager(UUIDGenerator.getInstance().generateStringUUID()), null, 
null, true, 0, 0, null);
+         ClusterConnectionImpl cci = new ClusterConnectionImpl(null, new 
TransportConfiguration[]{tc}, null, null, null, 0, 0L, 0L, 0L, 0, 0L, 0, 0, 0L, 
0L, false, null, 0, 0, () -> executor, new MockServer(), null, null, null, 0, 
new FakeNodeManager(UUIDGenerator.getInstance().generateStringUUID()), null, 
null, true, 0, 0, null, 0);
 
          TopologyMember topologyMember = new 
TopologyMemberImpl(RandomUtil.randomUUIDString(), null, null, null, null);
          cci.nodeUP(topologyMember, false);
@@ -115,6 +119,29 @@ public class ClusterConnectionImplMockTest extends 
ServerTestBase {
       }
    }
 
+   @Test
+   public void testCompareTCs() {
+      TransportConfiguration netty1 = new 
TransportConfiguration(NettyConnectorFactory.class.getName(), Map.of("host", 
"localhost", "port", 61616));
+      assertTrue(ClusterConnectionImpl.TopologyScanner.compareTCs(netty1, 
netty1));
+
+      TransportConfiguration netty2 = new 
TransportConfiguration(NettyConnectorFactory.class.getName(), Map.of("host", 
"127.0.0.1", "port", 61616));
+      assertTrue(ClusterConnectionImpl.TopologyScanner.compareTCs(netty2, 
netty2));
+
+      assertTrue(ClusterConnectionImpl.TopologyScanner.compareTCs(netty1, 
netty2));
+
+      TransportConfiguration netty3 = new 
TransportConfiguration(NettyConnectorFactory.class.getName(), Map.of("host", 
"my-host", "port", 61617));
+      assertTrue(ClusterConnectionImpl.TopologyScanner.compareTCs(netty3, 
netty3));
+
+      TransportConfiguration netty4 = new 
TransportConfiguration(NettyConnectorFactory.class.getName(), Map.of("host", 
"192.168.0.1", "port", 61617));
+      assertTrue(ClusterConnectionImpl.TopologyScanner.compareTCs(netty4, 
netty4));
+
+      assertFalse(ClusterConnectionImpl.TopologyScanner.compareTCs(netty1, 
netty3));
+      assertFalse(ClusterConnectionImpl.TopologyScanner.compareTCs(netty1, 
netty4));
+      assertFalse(ClusterConnectionImpl.TopologyScanner.compareTCs(netty2, 
netty3));
+      assertFalse(ClusterConnectionImpl.TopologyScanner.compareTCs(netty2, 
netty4));
+      assertFalse(ClusterConnectionImpl.TopologyScanner.compareTCs(netty3, 
netty4));
+   }
+
    static final class MockServer extends ActiveMQServerImpl {
 
       @Override
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 03d54aeacc..be6d3684af 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -440,6 +440,7 @@
                <connector-ref>connector1</connector-ref>
                <connector-ref>connector2</connector-ref>
             </static-connectors>
+            <topology-scanner-attempts>-1</topology-scanner-attempts>
          </cluster-connection>
          <cluster-connection name="cluster-connection2">
             <discovery-group-ref discovery-group-name="dg1"/>
@@ -452,6 +453,7 @@
             <message-load-balancing>STRICT</message-load-balancing>
             <max-hops>2</max-hops>
             <producer-window-size>333</producer-window-size>
+            <topology-scanner-attempts>30</topology-scanner-attempts>
          </cluster-connection>
          <cluster-connection name="cluster-connection3">
             <connector-ref>connector2</connector-ref>
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index 33e8f8de60..fd426361a2 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -328,6 +328,7 @@
                <connector-ref>connector1</connector-ref>
                <connector-ref>connector2</connector-ref>
             </static-connectors>
+            <topology-scanner-attempts>-1</topology-scanner-attempts>
          </cluster-connection>
          <cluster-connection name="cluster-connection2">
             <address>queues2</address>
@@ -340,6 +341,7 @@
             <producer-window-size>333</producer-window-size>
             <call-failover-timeout>456</call-failover-timeout>
             <discovery-group-ref discovery-group-name="dg1"/>
+            <topology-scanner-attempts>30</topology-scanner-attempts>
          </cluster-connection>
          <cluster-connection name="cluster-connection3">
             <connector-ref>connector2</connector-ref>
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-cluster-connections.xml
 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-cluster-connections.xml
index b4fe4468a6..59912341d6 100644
--- 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-cluster-connections.xml
+++ 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-cluster-connections.xml
@@ -35,6 +35,7 @@
          <connector-ref>connector1</connector-ref>
          <connector-ref>connector2</connector-ref>
       </static-connectors>
+      <topology-scanner-attempts>-1</topology-scanner-attempts>
    </cluster-connection>
    <cluster-connection name="cluster-connection2">
       <address>queues2</address>
@@ -47,6 +48,7 @@
       <producer-window-size>333</producer-window-size>
       <call-failover-timeout>456</call-failover-timeout>
       <discovery-group-ref discovery-group-name="dg1"/>
+      <topology-scanner-attempts>30</topology-scanner-attempts>
    </cluster-connection>
    <cluster-connection name="cluster-connection3">
       <connector-ref>connector2</connector-ref>
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterTest.java
index 5181233883..6226b2f9df 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterTest.java
@@ -19,13 +19,28 @@ package 
org.apache.activemq.artemis.tests.integration.cluster.distribution;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
 
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
+import 
org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
+import 
org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
+import 
org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
+import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.Connector;
+import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.jupiter.api.BeforeEach;
@@ -1396,24 +1411,42 @@ public class SymmetricClusterTest extends 
ClusterTestBase {
 
    @Test
    public void testStartStopServers() throws Exception {
-      doTestStartStopServers();
+      doTestStartStopServers(false);
    }
 
-   protected void validateTopologSize(int expectedSize, int... 
serverParameters) throws Exception {
+   @Test
+   public void testStartStopServersWithPartition() throws Exception {
+      doTestStartStopServers(true);
+   }
+
+   protected void validateTopologySize(int expectedSize, int... 
serverParameters) throws Exception {
       for (int s : serverParameters) {
          logger.debug("Checking topology size on node {}, expecting it to be 
{}", s, expectedSize);
 
          assertNotNull(servers[s], "Server[" + s + "] is null");
 
          for (ClusterConnection c : 
servers[s].getClusterManager().getClusterConnections()) {
-            Wait.assertEquals(expectedSize, () -> 
c.getTopology().getMembers().size(), 5000);
+            Wait.assertEquals(expectedSize, () -> 
c.getTopology().getMembers().size(), 5000, Wait.SLEEP_MILLIS, () -> {
+               Collection<TopologyMemberImpl> members = 
c.getTopology().getMembers();
+               return "SRV[" + s + "]/CC[" + c.getName() + "] has " + 
members.size() + "/" + expectedSize + " members:\n" +
+                   String.join("\n", members.stream().map(topologyMember -> 
topologyMember.toString()).collect(Collectors.toList()));
+            });
          }
       }
    }
 
-   public void doTestStartStopServers() throws Exception {
+   public void doTestStartStopServers(boolean withPartition) throws Exception {
       setupCluster();
 
+      if (withPartition) {
+         setupProxy();
+         enablePartition();
+
+         for (int node = 0; node < 5; node++) {
+            
getServer(node).getConfiguration().getClusterConfigurations().get(0).setTopologyScannerAttempts(-1);
+         }
+      }
+
       startServers();
 
       setupSessionFactory(0, isNetty());
@@ -1422,7 +1455,22 @@ public class SymmetricClusterTest extends 
ClusterTestBase {
       setupSessionFactory(3, isNetty());
       setupSessionFactory(4, isNetty());
 
-      validateTopologSize(5, 0, 1, 2, 3, 4);
+      if (withPartition) {
+         validateTopologySize(3, 0, 1, 2);
+         validateTopologySize(2, 3, 4);
+
+         disablePartition();
+      }
+
+      validateTopologySize(5, 0, 1, 2, 3, 4);
+
+      if (withPartition) {
+         for (int node = 0; node < 5; node++) {
+            final int serverNode = node;
+            Wait.assertTrue(() -> 
!((ClusterConnectionImpl)getServer(serverNode).getClusterManager().
+                getClusterConnection("cluster" + 
serverNode)).getTopologyScanner().isRunning());
+         }
+      }
 
       createQueue(0, "queues.testaddress", "queue0", null, false);
       createQueue(1, "queues.testaddress", "queue1", null, false);
@@ -1535,16 +1583,16 @@ public class SymmetricClusterTest extends 
ClusterTestBase {
 
       stopServers(0, 3);
 
-      validateTopologSize(3, 1, 2, 4);
+      validateTopologySize(3, 1, 2, 4);
 
       startServers(3, 0);
 
-      validateTopologSize(5, 0, 1, 2, 3, 4);
+      validateTopologySize(5, 0, 1, 2, 3, 4);
 
       setupSessionFactory(0, isNetty());
       setupSessionFactory(3, isNetty());
 
-      validateTopologSize(5, 0, 1, 2, 3, 4);
+      validateTopologySize(5, 0, 1, 2, 3, 4);
 
       createQueue(0, "queues.testaddress", "queue0", null, false);
       createQueue(3, "queues.testaddress", "queue3", null, false);
@@ -2010,4 +2058,141 @@ public class SymmetricClusterTest extends 
ClusterTestBase {
    protected boolean isFileStorage() {
       return false;
    }
+
+   protected void setupProxy() {
+      setupProxy(0);
+      setupProxy(1);
+      setupProxy(2);
+      setupProxy(3);
+      setupProxy(4);
+   }
+
+   protected void setupProxy(final int nodeId) {
+      
getServer(nodeId).getConfiguration().getConnectorConfigurations().forEach((s, 
transportConfiguration) -> {
+         transportConfiguration.getExtraParams().put("proxyNodeId", nodeId);
+         if (isNetty()) {
+            
transportConfiguration.setFactoryClassName(NettyProxyConnectorFactory.class.getName());
+         } else {
+            
transportConfiguration.setFactoryClassName(InVMProxyConnectorFactory.class.getName());
+         }
+      });
+   }
+
+   protected void enablePartition() {
+      ProxyConnectorFactory.interceptor = (nodeId, configuration) -> {
+         int partitionId = (nodeId % 5) / 3;
+         int targetPartitionId;
+
+         if (isNetty()) {
+            int targetPort = 
(int)configuration.get(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME);
+            targetPartitionId = ((targetPort - 
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT)
 % 5) / 3;
+         } else {
+            int targetServerId = 
(int)configuration.get(org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME);
+            targetPartitionId = (targetServerId % 5) / 3;
+         }
+
+         return partitionId == targetPartitionId;
+      };
+   }
+
+   protected void disablePartition() {
+      ProxyConnectorFactory.interceptor = null;
+   }
+
+   public interface ProxyConnectorInterceptor {
+      boolean allowConnection(int nodeId, Map<String, Object> configuration);
+   }
+
+   public static class InVMProxyConnectorFactory extends ProxyConnectorFactory 
{
+      public InVMProxyConnectorFactory() {
+         super(new InVMConnectorFactory());
+      }
+   }
+
+   public static class NettyProxyConnectorFactory extends 
ProxyConnectorFactory {
+      public NettyProxyConnectorFactory() {
+         super(new NettyConnectorFactory());
+      }
+   }
+
+   public abstract static class ProxyConnectorFactory implements 
ConnectorFactory {
+      public static volatile ProxyConnectorInterceptor interceptor;
+
+      private ConnectorFactory rawConnectorFactory;
+
+      public ProxyConnectorFactory(ConnectorFactory rawConnectorFactory) {
+         this.rawConnectorFactory = rawConnectorFactory;
+      }
+
+      @Override
+      public Map<String, Object> getDefaults() {
+         return rawConnectorFactory.getDefaults();
+      }
+
+      @Override
+      public Connector createConnector(Map<String, Object> configuration,
+          BufferHandler handler,
+          ClientConnectionLifeCycleListener listener,
+          Executor closeExecutor,
+          Executor threadPool,
+          ScheduledExecutorService scheduledThreadPool,
+          ClientProtocolManager protocolManager) {
+
+         return new ProxyConnector(rawConnectorFactory.createConnector(
+             configuration,
+             handler,
+             listener,
+             closeExecutor,
+             threadPool,
+             scheduledThreadPool,
+             protocolManager), configuration);
+      }
+
+      @Override
+      public boolean isReliable() {
+         return rawConnectorFactory.isReliable();
+      }
+
+      private class ProxyConnector implements Connector {
+         private Connector rawConnector;
+
+         private Map<String, Object> configuration;
+
+         ProxyConnector(Connector rawConnector, Map<String, Object> 
configuration) {
+            this.rawConnector = rawConnector;
+            this.configuration = configuration;
+         }
+
+         @Override
+         public void start() {
+            rawConnector.start();
+         }
+
+         @Override
+         public void close() {
+            rawConnector.close();
+         }
+
+         @Override
+         public boolean isStarted() {
+            return rawConnector.isStarted();
+         }
+
+         @Override
+         public Connection createConnection() {
+
+            ProxyConnectorInterceptor interceptor = 
ProxyConnectorFactory.interceptor;
+            if (interceptor == null || 
interceptor.allowConnection((int)configuration.get("proxyNodeId"), 
configuration)) {
+               return rawConnector.createConnection();
+            }
+
+            return null;
+         }
+
+         @Override
+         public boolean isEquivalent(Map<String, Object> configuration) {
+            return rawConnector.isEquivalent(configuration);
+         }
+      }
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
index 6db7332038..b423cce67d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
@@ -490,4 +490,14 @@ public class SymmetricClusterWithBackupTest extends 
SymmetricClusterTest {
       stopServers(5, 6, 7, 8, 9, 0, 1, 2, 3, 4);
    }
 
+   @Override
+   protected void setupProxy() {
+      super.setupProxy();
+
+      setupProxy(5);
+      setupProxy(6);
+      setupProxy(7);
+      setupProxy(8);
+      setupProxy(9);
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
index ab4b5133f9..290c4461c2 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
@@ -16,6 +16,14 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.distribution;
 
+import java.io.EOFException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.BroadcastEndpoint;
+import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.junit.jupiter.api.Test;
@@ -35,7 +43,14 @@ public class SymmetricClusterWithDiscoveryTest extends 
SymmetricClusterTest {
    @Test
    public void testStartStopServers() throws Exception {
       // When using discovery starting and stopping it too fast could have a 
race condition with UDP
-      doTestStartStopServers();
+      doTestStartStopServers(false);
+   }
+
+   @Override
+   @Test
+   public void testStartStopServersWithPartition() throws Exception {
+      // When using discovery starting and stopping it too fast could have a 
race condition with UDP
+      doTestStartStopServers(true);
    }
 
    @Override
@@ -71,6 +86,116 @@ public class SymmetricClusterWithDiscoveryTest extends 
SymmetricClusterTest {
     */
    @Test
    public void testStartStopServersWithPauseBeforeRestarting() throws 
Exception {
-      doTestStartStopServers();
+      doTestStartStopServers(false);
+   }
+
+   @Override
+   protected void setupProxy(int nodeId) {
+      
getServer(nodeId).getConfiguration().getDiscoveryGroupConfigurations().get("dg1").
+          setBroadcastEndpointFactory(new 
ProxyBroadcastEndpointFactory(nodeId, getServer(nodeId).
+          
getConfiguration().getDiscoveryGroupConfigurations().get("dg1").getBroadcastEndpointFactory()));
+   }
+
+   @Override
+   protected void enablePartition() {
+      super.enablePartition();
+
+      ProxyBroadcastEndpointFactory.interceptor = (nodeId, data) -> {
+         ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(data);
+         SimpleString originatingNodeID = SimpleString.of(buffer.readString());
+
+         for (int i = 0; i < servers.length; i++) {
+            if (servers[i] != null && 
originatingNodeID.equals(servers[i].getNodeID())) {
+               int partitionId = (i % 5) / 3;
+               int targetPartitionId = (nodeId % 5) / 3;
+
+               return partitionId == targetPartitionId;
+            }
+         }
+
+         return false;
+      };
+   }
+
+   @Override
+   protected void disablePartition() {
+      super.disablePartition();
+
+      ProxyBroadcastEndpointFactory.interceptor = null;
+   }
+
+   public interface ProxyBroadcastEndpointInterceptor {
+      boolean allowBroadcast(int nodeId, byte[] data);
+   }
+
+   public class ProxyBroadcastEndpointFactory implements 
BroadcastEndpointFactory {
+
+      public static volatile ProxyBroadcastEndpointInterceptor interceptor;
+
+      private int nodeId;
+      private final BroadcastEndpointFactory rawBroadcastEndpointFactory;
+
+      public ProxyBroadcastEndpointFactory(int nodeId, 
BroadcastEndpointFactory rawBroadcastEndpointFactory) {
+         this.nodeId = nodeId;
+         this.rawBroadcastEndpointFactory = rawBroadcastEndpointFactory;
+      }
+
+      @Override
+      public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
+         return new ProxyBroadcastEndpoint(nodeId, 
rawBroadcastEndpointFactory.createBroadcastEndpoint());
+      }
+
+      private class ProxyBroadcastEndpoint implements BroadcastEndpoint {
+
+         private int nodeId;
+         private volatile boolean open;
+         private final BroadcastEndpoint rawBroadcastEndpoint;
+
+         ProxyBroadcastEndpoint(int nodeId, BroadcastEndpoint 
rawBroadcastEndpoint) {
+            this.nodeId = nodeId;
+            this.rawBroadcastEndpoint = rawBroadcastEndpoint;
+         }
+
+         @Override
+         public void openClient() throws Exception {
+            open = true;
+            rawBroadcastEndpoint.openClient();
+         }
+
+         @Override
+         public void openBroadcaster() throws Exception {
+            open = true;
+            rawBroadcastEndpoint.openBroadcaster();
+         }
+
+         @Override
+         public void close(boolean isBroadcast) throws Exception {
+            open = false;
+            rawBroadcastEndpoint.close(isBroadcast);
+         }
+
+         @Override
+         public void broadcast(byte[] data) throws Exception {
+            rawBroadcastEndpoint.broadcast(data);
+         }
+
+         @Override
+         public byte[] receiveBroadcast() throws Exception {
+            return receiveBroadcast(Long.MAX_VALUE, TimeUnit.DAYS);
+         }
+
+         @Override
+         public byte[] receiveBroadcast(long time, TimeUnit unit) throws 
Exception {
+            while (open) {
+               byte[] data = rawBroadcastEndpoint.receiveBroadcast(time, unit);
+
+               ProxyBroadcastEndpointInterceptor interceptor = 
ProxyBroadcastEndpointFactory.interceptor;
+               if (interceptor == null || interceptor.allowBroadcast(nodeId, 
data)) {
+                  return data;
+               }
+            }
+            throw new EOFException();
+         }
+      }
    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to