This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new ae5208c543 ARTEMIS-5382 Scan cluster topology to merge subgroups
ae5208c543 is described below
commit ae5208c54301375d723999c16ab513cd9b897668
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Thu Apr 3 18:25:17 2025 +0200
ARTEMIS-5382 Scan cluster topology to merge subgroups
After the first cluster connection a broker only connects to the other
brokers
in its topology. Temporary network issues can cause cluster subgroups. In
case
of cluster subgroups the topology only contains the broker of its subgroup.
So
the cluster subgroups will remain separate.
---
.../api/config/ActiveMQDefaultConfiguration.java | 14 +-
.../core/client/impl/ServerLocatorImpl.java | 23 ++-
.../core/client/impl/ServerLocatorInternal.java | 3 +
.../config/ClusterConnectionConfiguration.java | 16 ++
.../deployers/impl/FileConfigurationParser.java | 4 +-
.../artemis/core/server/ActiveMQServerLogger.java | 5 +
.../core/server/cluster/ClusterManager.java | 4 +-
.../server/cluster/impl/ClusterConnectionImpl.java | 196 +++++++++++++++++++-
.../resources/schema/artemis-configuration.xsd | 8 +
.../core/config/impl/FileConfigurationTest.java | 3 +
.../impl/ClusterConnectionImplMockTest.java | 31 +++-
.../resources/ConfigurationTest-full-config.xml | 2 +
.../ConfigurationTest-xinclude-config.xml | 2 +
...-xinclude-schema-config-cluster-connections.xml | 2 +
.../cluster/distribution/SymmetricClusterTest.java | 201 ++++++++++++++++++++-
.../SymmetricClusterWithBackupTest.java | 10 +
.../SymmetricClusterWithDiscoveryTest.java | 129 ++++++++++++-
17 files changed, 627 insertions(+), 26 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 9546ab2f95..fef52adb16 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -703,14 +703,18 @@ public final class ActiveMQDefaultConfiguration {
// These properties used to defined with this prefix.
// I'm keeping the older property name in an attempt to guarantee
compatibility
private static final String FORMER_ACK_RETRY_CLASS_NAME =
"org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckRetry";
- private static final int DEFAULT_MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS =
Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME +
".MIN_QUEUE_ATTEMPTS", "5"));;
- private static final int DEFAULT_MIRROR_ACK_MANAGER_PAGE_ATTEMPTS =
Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME +
".MAX_PAGE_ATTEMPT", "2"));;
+ private static final int DEFAULT_MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS =
Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME +
".MIN_QUEUE_ATTEMPTS", "5"));
+ private static final int DEFAULT_MIRROR_ACK_MANAGER_PAGE_ATTEMPTS =
Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME +
".MAX_PAGE_ATTEMPT", "2"));
- private static final int DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY =
Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME +
".RETRY_DELAY", "100"));;
+ private static final int DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY =
Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME +
".RETRY_DELAY", "100"));
private static final boolean DEFAULT_MIRROR_ACK_MANAGER_WARN_UNACKED =
false;
private static final boolean DEFAULT_MIRROR_PAGE_TRANSACTION = false;
+
+ private static final int DEFAULT_CLUSTER_TOPOLOGY_SCANNER_ATTEMPTS = 30;
+
+
/**
* If {@code true} then the ActiveMQ Artemis Server will make use of any
Protocol Managers that are in available on
* the classpath. If false then only the core protocol will be available,
unless in Embedded mode where users can
@@ -1997,4 +2001,8 @@ public final class ActiveMQDefaultConfiguration {
public static int getInitialQueueBufferSize() {
return INITIAL_QUEUE_BUFFER_SIZE;
}
+
+ public static int getClusterTopologyScannerAttempts() {
+ return DEFAULT_CLUSTER_TOPOLOGY_SCANNER_ATTEMPTS;
+ }
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index f1fe47a867..35c518ff4c 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -187,6 +187,8 @@ public final class ServerLocatorImpl implements
ServerLocatorInternal, Discovery
private String passwordCodec;
+ private DiscoveryListener discoveryListener;
+
public static synchronized void clearThreadPools() {
ActiveMQClient.clearThreadPools();
}
@@ -440,6 +442,7 @@ public final class ServerLocatorImpl implements
ServerLocatorInternal, Discovery
groupID = locator.groupID;
nodeID = locator.nodeID;
clusterTransportConfiguration = locator.clusterTransportConfiguration;
+ discoveryListener = locator.discoveryListener;
}
private boolean useInitConnector() {
@@ -586,6 +589,12 @@ public final class ServerLocatorImpl implements
ServerLocatorInternal, Discovery
return afterConnectListener;
}
+ @Override
+ public ServerLocatorImpl setDiscoveryListener(DiscoveryListener
discoveryListener) {
+ this.discoveryListener = discoveryListener;
+ return this;
+ }
+
@Override
public ClientSessionFactory createSessionFactory(String nodeID) throws
Exception {
TopologyMember topologyMember = topology.getMember(nodeID);
@@ -1624,18 +1633,18 @@ public final class ServerLocatorImpl implements
ServerLocatorInternal, Discovery
@Override
public synchronized void connectorsChanged(List<DiscoveryEntry>
newConnectors) {
+ DiscoveryListener discoveryListener = this.discoveryListener;
+
if (receivedTopology) {
+ if (discoveryListener != null) {
+ discoveryListener.connectorsChanged(newConnectors);
+ }
return;
}
final List<TransportConfiguration> newInitialconnectors = new
ArrayList<>(newConnectors.size());
for (DiscoveryEntry entry : newConnectors) {
- if (ha && topology.getMember(entry.getNodeID()) == null) {
- TopologyMemberImpl member = new
TopologyMemberImpl(entry.getNodeID(), null, null, entry.getConnector(), null);
- // on this case we set it as zero as any update coming from server
should be accepted
- topology.updateMember(0, entry.getNodeID(), member);
- }
// ignore its own transport connector
if (!entry.getConnector().equals(clusterTransportConfiguration)) {
newInitialconnectors.add(entry.getConnector());
@@ -1661,6 +1670,10 @@ public final class ServerLocatorImpl implements
ServerLocatorInternal, Discovery
connectRunnable.run();
}
}
+
+ if (discoveryListener != null) {
+ discoveryListener.connectorsChanged(newConnectors);
+ }
}
@Override
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java
index b39cfd58ae..4338d7a9b1 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.cluster.DiscoveryListener;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
public interface ServerLocatorInternal extends ServerLocator {
@@ -35,6 +36,8 @@ public interface ServerLocatorInternal extends ServerLocator {
ServerLocatorInternal
setAfterConnectionInternalListener(AfterConnectInternalListener listener);
+ ServerLocatorInternal setDiscoveryListener(DiscoveryListener listener);
+
/**
* Used to better identify Cluster Connection Locators on logs. To
facilitate eventual debugging.
* <p>
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java
index 5cbbb13cb9..5b9d47dc3f 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java
@@ -85,6 +85,8 @@ public final class ClusterConnectionConfiguration implements
Serializable {
private String clientId;
+ private int topologyScannerAttempts =
ActiveMQDefaultConfiguration.getClusterTopologyScannerAttempts();
+
public ClusterConnectionConfiguration() {
}
@@ -318,6 +320,15 @@ public final class ClusterConnectionConfiguration
implements Serializable {
return this;
}
+ public int getTopologyScannerAttempts() {
+ return topologyScannerAttempts;
+ }
+
+ public ClusterConnectionConfiguration setTopologyScannerAttempts(int
topologyScannerAttempts) {
+ this.topologyScannerAttempts = topologyScannerAttempts;
+ return this;
+ }
+
/**
* This method will match the configuration and return the proper
TransportConfiguration for the Configuration
*/
@@ -402,6 +413,7 @@ public final class ClusterConnectionConfiguration
implements Serializable {
result = prime * result + (int) (temp ^ (temp >>> 32));
result = prime * result + ((staticConnectors == null) ? 0 :
staticConnectors.hashCode());
result = prime * result + ((clientId == null) ? 0 : clientId.hashCode());
+ result = prime * result + topologyScannerAttempts;
return result;
}
@@ -510,6 +522,9 @@ public final class ClusterConnectionConfiguration
implements Serializable {
} else if (!clientId.equals(other.clientId)) {
return false;
}
+ if (topologyScannerAttempts != other.topologyScannerAttempts) {
+ return false;
+ }
return true;
}
@@ -540,6 +555,7 @@ public final class ClusterConnectionConfiguration
implements Serializable {
", clusterNotificationInterval=" + clusterNotificationInterval +
", clusterNotificationAttempts=" + clusterNotificationAttempts +
", clientId=" + clientId +
+ ", topologyScannerInterval=" + topologyScannerAttempts +
'}';
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 31c7b40d6b..7d6839fffe 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -2360,6 +2360,8 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
int clusterNotificationAttempts = getInteger(e, "notification-attempts",
ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), GT_ZERO);
+ int topologyScannerAttempts = getInteger(e, "topology-scanner-attempts",
ActiveMQDefaultConfiguration.getClusterTopologyScannerAttempts(),
MINUS_ONE_OR_POSITIVE_INT);
+
String discoveryGroupName = null;
List<String> staticConnectorNames = new ArrayList<>();
@@ -2384,7 +2386,7 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
String clientId = getString(e, "client-id", null, NO_CHECK);
- ClusterConnectionConfiguration config = new
ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorName).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).s
[...]
+ ClusterConnectionConfiguration config = new
ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorName).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).s
[...]
if (discoveryGroupName == null) {
config.setStaticConnectors(staticConnectorNames);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 52767b92d9..29c92a329c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
+import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
@@ -1495,4 +1496,8 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 224143, value = "Bridge {} failed to send {}: {} {}",
level = LogMessage.Level.WARN)
void bridgeFailedToSend(String bridgeName, String message, String
exceptionName, String exceptionMessage);
+
+ @LogMessage(id = 224144, value = "The topology of the cluster connection {}
doesn't include all th expected members. "
+ + "Check the discovery group or the static connectors of the cluster
connection if the topology is correct: {} / {}", level = LogMessage.Level.WARN)
+ void incompleteClusterTopology(String clusterConnection, Topology topology,
String topologyMembers);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
index 4d86c25c2f..f1790300fb 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
@@ -616,7 +616,7 @@ public class ClusterManager implements ActiveMQComponent {
logger.debug("{} Starting a Discovery Group Cluster Connection,
name={}, dg={}", this, config.getDiscoveryGroupName(), dg);
}
- clusterConnection = new ClusterConnectionImpl(this, dg, connector,
SimpleString.of(config.getName()), SimpleString.of(config.getAddress() != null
? config.getAddress() : ""), config.getMinLargeMessageSize(),
config.getClientFailureCheckPeriod(), config.getConnectionTTL(),
config.getRetryInterval(), config.getRetryIntervalMultiplier(),
config.getMaxRetryInterval(), config.getInitialConnectAttempts(),
config.getReconnectAttempts(), config.getCallTimeout(),
config.getCallFailoverTi [...]
+ clusterConnection = new ClusterConnectionImpl(this, dg, connector,
SimpleString.of(config.getName()), SimpleString.of(config.getAddress() != null
? config.getAddress() : ""), config.getMinLargeMessageSize(),
config.getClientFailureCheckPeriod(), config.getConnectionTTL(),
config.getRetryInterval(), config.getRetryIntervalMultiplier(),
config.getMaxRetryInterval(), config.getInitialConnectAttempts(),
config.getReconnectAttempts(), config.getCallTimeout(),
config.getCallFailoverTi [...]
clusterController.addClusterConnection(clusterConnection.getName(),
dg, config, connector);
} else {
@@ -626,7 +626,7 @@ public class ClusterManager implements ActiveMQComponent {
logger.debug("{} defining cluster connection towards {}", this,
Arrays.toString(tcConfigs));
}
- clusterConnection = new ClusterConnectionImpl(this, tcConfigs,
connector, SimpleString.of(config.getName()),
SimpleString.of(config.getAddress()), config.getMinLargeMessageSize(),
config.getClientFailureCheckPeriod(), config.getConnectionTTL(),
config.getRetryInterval(), config.getRetryIntervalMultiplier(),
config.getMaxRetryInterval(), config.getInitialConnectAttempts(),
config.getReconnectAttempts(), config.getCallTimeout(),
config.getCallFailoverTimeout(), config.isDuplicateD [...]
+ clusterConnection = new ClusterConnectionImpl(this, tcConfigs,
connector, SimpleString.of(config.getName()),
SimpleString.of(config.getAddress()), config.getMinLargeMessageSize(),
config.getClientFailureCheckPeriod(), config.getConnectionTTL(),
config.getRetryInterval(), config.getRetryIntervalMultiplier(),
config.getMaxRetryInterval(), config.getInitialConnectAttempts(),
config.getReconnectAttempts(), config.getCallTimeout(),
config.getCallFailoverTimeout(), config.isDuplicateD [...]
clusterController.addClusterConnection(clusterConnection.getName(),
tcConfigs, config, connector);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 9ba6b838fe..7efaa628bf 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -18,11 +18,15 @@ package
org.apache.activemq.artemis.core.server.cluster.impl;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -48,12 +52,15 @@ import
org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.client.impl.TopologyManager;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
+import org.apache.activemq.artemis.core.cluster.DiscoveryEntry;
+import org.apache.activemq.artemis.core.cluster.DiscoveryListener;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
@@ -78,8 +85,9 @@ import
org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
+import java.util.concurrent.atomic.AtomicInteger;
-public final class ClusterConnectionImpl implements ClusterConnection,
AfterConnectInternalListener, TopologyManager {
+public final class ClusterConnectionImpl implements ClusterConnection,
AfterConnectInternalListener, TopologyManager, DiscoveryListener {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -183,6 +191,12 @@ public final class ClusterConnectionImpl implements
ClusterConnection, AfterConn
private final String clientId;
+ private volatile List<DiscoveryEntry> discoveryEntries;
+
+ private final TransportConfiguration[] staticTransportConfigurations;
+
+ private final TopologyScanner topologyScanner;
+
/**
* For tests only
*/
@@ -222,7 +236,8 @@ public final class ClusterConnectionImpl implements
ClusterConnection, AfterConn
final boolean allowDirectConnectionsOnly,
final long clusterNotificationInterval,
final int clusterNotificationAttempts,
- final String clientId) throws Exception {
+ final String clientId,
+ final int topologyScannerAttempts) throws
Exception {
this.nodeManager = nodeManager;
this.connector = connector;
@@ -307,6 +322,11 @@ public final class ClusterConnectionImpl implements
ClusterConnection, AfterConn
this.storeAndForwardPrefix = server.getInternalNamingPrefix() +
SN_PREFIX;
this.clientId = clientId;
+
+ this.staticTransportConfigurations = staticTranspConfigs;
+
+ this.topologyScanner = topologyScannerAttempts == 0 ? null : new
TopologyScanner(
+ scheduledExecutor, executor, topologyScannerAttempts);
}
public ClusterConnectionImpl(final ClusterManager manager,
@@ -340,7 +360,8 @@ public final class ClusterConnectionImpl implements
ClusterConnection, AfterConn
final boolean allowDirectConnectionsOnly,
final long clusterNotificationInterval,
final int clusterNotificationAttempts,
- final String clientId) throws Exception {
+ final String clientId,
+ final int topologyScannerAttempts) throws
Exception {
this.nodeManager = nodeManager;
this.connector = connector;
@@ -410,6 +431,11 @@ public final class ClusterConnectionImpl implements
ClusterConnection, AfterConn
this.storeAndForwardPrefix = server.getInternalNamingPrefix() +
SN_PREFIX;
this.clientId = clientId;
+
+ this.staticTransportConfigurations = null;
+
+ this.topologyScanner = topologyScannerAttempts == 0 ? null : new
TopologyScanner(
+ scheduledExecutor, executor, topologyScannerAttempts);
}
@Override
@@ -446,6 +472,11 @@ public final class ClusterConnectionImpl implements
ClusterConnection, AfterConn
if (serverLocator != null) {
serverLocator.removeClusterTopologyListener(this);
+ serverLocator.setDiscoveryListener(null);
+ }
+
+ if (topologyScanner != null) {
+ topologyScanner.stop();
}
if (logger.isDebugEnabled()) {
@@ -593,6 +624,164 @@ public final class ClusterConnectionImpl implements
ClusterConnection, AfterConn
// false,
// localMember.getConnector().a,
// localMember.getConnector().b);
+
+ if (topologyScanner != null && !stopping) {
+ topologyScanner.start();
+ topologyScanner.resetCounter();
+ topologyScanner.delay();
+ }
+ }
+
+ @Override
+ public void connectorsChanged(List<DiscoveryEntry> newConnectors) {
+ discoveryEntries = newConnectors;
+
+ if (topologyScanner != null && !stopping && topologyScanner.isStarted())
{
+ topologyScanner.resetCounter();
+ topologyScanner.delay();
+ }
+ }
+
+ public TopologyScanner getTopologyScanner() {
+ return topologyScanner;
+ }
+
+ public final class TopologyScanner extends ActiveMQScheduledComponent {
+
+ private final int attempts;
+
+ private final AtomicInteger counter = new AtomicInteger();
+
+ private volatile boolean running = false;
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ TopologyScanner(ScheduledExecutorService scheduledExecutorService,
Executor executor, int attempts) {
+ super(scheduledExecutorService, executor, retryInterval,
TimeUnit.MILLISECONDS, true);
+ this.attempts = attempts;
+ }
+
+ @Override
+ public boolean delay() {
+ running = true;
+
+ return super.delay();
+ }
+
+ public void resetCounter() {
+ counter.set(0);
+ }
+
+ @Override
+ public void run() {
+ TransportConfiguration[] transportConfigurations = null;
+
+ if (staticTransportConfigurations != null) {
+ transportConfigurations = staticTransportConfigurations;
+ } else {
+ List<DiscoveryEntry> discoveredEntries = discoveryEntries;
+
+ if (discoveredEntries != null) {
+ transportConfigurations = discoveryEntries.stream()
+ .map(discoveryEntry -> discoveryEntry.getConnector())
+ .toArray(TransportConfiguration[]::new);
+ } else {
+ logger.debug("No discovered entries");
+ }
+ }
+
+ if (transportConfigurations != null) {
+ boolean topologyUpdated = updateTopology(transportConfigurations);
+
+ int topologyScannerCount = counter.incrementAndGet();
+
+ boolean retry = (attempts == -1 || topologyScannerCount <
attempts);
+
+ if (!topologyUpdated && !stopping && retry) {
+ delay();
+ } else {
+ running = false;
+
+ if (!topologyUpdated && !stopping) {
+
ActiveMQServerLogger.LOGGER.incompleteClusterTopology(name.toString(),
+ topology, topology.getMembers().toString());
+ }
+ }
+ }
+ }
+
+ private boolean updateTopology(TransportConfiguration[]
transportConfigurations) {
+ boolean result = true;
+
+ for (TransportConfiguration transportConfiguration :
transportConfigurations) {
+ if (!topology.getMembers().stream().anyMatch(member ->
compareTCs(connector, transportConfiguration) ||
+ member.getPrimary() != null && compareTCs(member.getPrimary(),
transportConfiguration) ||
+ member.getBackup() != null && compareTCs(member.getBackup(),
transportConfiguration))) {
+
+ try (ServerLocatorInternal targetLocator = new
ServerLocatorImpl(topology, true, transportConfiguration)) {
+ targetLocator.setReconnectAttempts(0);
+ targetLocator.setInitialConnectAttempts(0);
+ targetLocator.setConnectionTTL(connectionTTL);
+ targetLocator.setCallTimeout(callTimeout);
+ targetLocator.setNodeID(nodeManager.getNodeId().toString());
+ targetLocator.setClusterTransportConfiguration(connector);
+ targetLocator.setIdentity("(Cluster-topology-scanner::" +
server.toString() + ")");
+
+ try {
+ try (ClientSessionFactoryInternal
targetClientSessionFactory = targetLocator.connect()) {
+ boolean waitForTopology =
targetClientSessionFactory.waitForTopology(
+ callTimeout, TimeUnit.MILLISECONDS);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Cluster topology scanner
waitForTopology from {}: {}",
+ transportConfiguration, waitForTopology);
+ }
+ }
+ } catch (ActiveMQException e) {
+ result = false;
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Cluster topology scanner failed to
connect to {}: {}",
+ transportConfiguration, e);
+ }
+ }
+ }
+ }
+ }
+
+ return result;
+ }
+
+ protected static boolean compareTCs(TransportConfiguration config,
TransportConfiguration otherConfig) {
+ if (config.getFactoryClassName().contains("Netty") &&
otherConfig.getFactoryClassName().contains("Netty")) {
+ return Objects.equals(config.getParams().get("port"),
otherConfig.getParams().get("port")) &&
+ compareHosts((String)config.getParams().get("host"),
(String)otherConfig.getParams().get("host"));
+ } else if (config.getFactoryClassName().contains("InVM") &&
otherConfig.getFactoryClassName().contains("InVM")) {
+ return Objects.equals(config.getParams().get("serverId"),
otherConfig.getParams().get("serverId"));
+ }
+
+ return false;
+ }
+
+ private static boolean compareHosts(String host, String otherHost) {
+ if (Objects.equals(host, otherHost)) {
+ return true;
+ }
+
+ if (host != null && otherHost != null) {
+ try {
+ InetAddress hostAddr = InetAddress.getByName(host);
+ InetAddress otherHostAddr = InetAddress.getByName(otherHost);
+ return hostAddr.equals(otherHostAddr);
+ } catch (UnknownHostException e) {
+ logger.debug("Error resolving hosts: {}", e);
+ }
+ }
+
+ return false;
+ }
}
@Override
@@ -704,6 +893,7 @@ public final class ClusterConnectionImpl implements
ClusterConnection, AfterConn
}
serverLocator.setAfterConnectionInternalListener(this);
+ serverLocator.setDiscoveryListener(this);
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator,
server.getStorageManager()));
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 1b3698788b..2329f4ad3e 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2630,6 +2630,14 @@
</xsd:annotation>
</xsd:element>
+ <xsd:element name="topology-scanner-attempts" type="xsd:int"
default="-1" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ maximum number of topology scanner attempts, -1 means 'no
limits'
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
<xsd:element ref="discovery-type" maxOccurs="1" minOccurs="0"/>
</xsd:all>
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index ec7a6031c6..799a364112 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -464,6 +464,7 @@ public class FileConfigurationTest extends
AbstractConfigurationTestBase {
assertEquals(ActiveMQDefaultConfiguration.getDefaultClusterCallTimeout(),
ccc.getCallTimeout());
assertEquals(ActiveMQDefaultConfiguration.getDefaultClusterCallFailoverTimeout(),
ccc.getCallFailoverTimeout());
assertEquals("myClientID", ccc.getClientId());
+
assertEquals(ActiveMQDefaultConfiguration.getClusterTopologyScannerAttempts(),
ccc.getTopologyScannerAttempts());
} else if (ccc.getName().equals("cluster-connection1")) {
assertEquals("cluster-connection1", ccc.getName());
assertEquals(321, ccc.getMinLargeMessageSize(),
"clusterConnectionConf minLargeMessageSize");
@@ -484,6 +485,7 @@ public class FileConfigurationTest extends
AbstractConfigurationTestBase {
assertNull(ccc.getDiscoveryGroupName());
assertEquals(222, ccc.getProducerWindowSize());
assertTrue(ccc.isAllowDirectConnectionsOnly());
+ assertEquals(-1, ccc.getTopologyScannerAttempts());
} else {
assertEquals("cluster-connection2", ccc.getName());
assertEquals("queues2", ccc.getAddress());
@@ -497,6 +499,7 @@ public class FileConfigurationTest extends
AbstractConfigurationTestBase {
assertEquals("dg1", ccc.getDiscoveryGroupName());
assertEquals(333, ccc.getProducerWindowSize());
assertFalse(ccc.isAllowDirectConnectionsOnly());
+ assertEquals(30, ccc.getTopologyScannerAttempts());
}
}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImplMockTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImplMockTest.java
index ca90c737a3..d6501e5dbb 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImplMockTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImplMockTest.java
@@ -19,13 +19,16 @@ package
org.apache.activemq.artemis.core.server.cluster.impl;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.Map;
import java.util.concurrent.Executors;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
+import
org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.NodeManager;
@@ -87,7 +90,8 @@ public class ClusterConnectionImplMockTest extends
ServerTestBase {
true, //final boolean allowDirectConnectionsOnly,
0, //final long clusterNotificationInterval,
0, //final int clusterNotificationAttempts)
- null
+ null,
+ 0
);
assertEquals(1, cci.allowableConnections.size());
@@ -106,7 +110,7 @@ public class ClusterConnectionImplMockTest extends
ServerTestBase {
ArtemisExecutor executor =
ArtemisExecutor.delegate(Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName())));
try {
- ClusterConnectionImpl cci = new ClusterConnectionImpl(null, new
TransportConfiguration[]{tc}, null, null, null, 0, 0L, 0L, 0L, 0, 0L, 0, 0, 0L,
0L, false, null, 0, 0, () -> executor, new MockServer(), null, null, null, 0,
new FakeNodeManager(UUIDGenerator.getInstance().generateStringUUID()), null,
null, true, 0, 0, null);
+ ClusterConnectionImpl cci = new ClusterConnectionImpl(null, new
TransportConfiguration[]{tc}, null, null, null, 0, 0L, 0L, 0L, 0, 0L, 0, 0, 0L,
0L, false, null, 0, 0, () -> executor, new MockServer(), null, null, null, 0,
new FakeNodeManager(UUIDGenerator.getInstance().generateStringUUID()), null,
null, true, 0, 0, null, 0);
TopologyMember topologyMember = new
TopologyMemberImpl(RandomUtil.randomUUIDString(), null, null, null, null);
cci.nodeUP(topologyMember, false);
@@ -115,6 +119,29 @@ public class ClusterConnectionImplMockTest extends
ServerTestBase {
}
}
+ @Test
+ public void testCompareTCs() {
+ TransportConfiguration netty1 = new
TransportConfiguration(NettyConnectorFactory.class.getName(), Map.of("host",
"localhost", "port", 61616));
+ assertTrue(ClusterConnectionImpl.TopologyScanner.compareTCs(netty1,
netty1));
+
+ TransportConfiguration netty2 = new
TransportConfiguration(NettyConnectorFactory.class.getName(), Map.of("host",
"127.0.0.1", "port", 61616));
+ assertTrue(ClusterConnectionImpl.TopologyScanner.compareTCs(netty2,
netty2));
+
+ assertTrue(ClusterConnectionImpl.TopologyScanner.compareTCs(netty1,
netty2));
+
+ TransportConfiguration netty3 = new
TransportConfiguration(NettyConnectorFactory.class.getName(), Map.of("host",
"my-host", "port", 61617));
+ assertTrue(ClusterConnectionImpl.TopologyScanner.compareTCs(netty3,
netty3));
+
+ TransportConfiguration netty4 = new
TransportConfiguration(NettyConnectorFactory.class.getName(), Map.of("host",
"192.168.0.1", "port", 61617));
+ assertTrue(ClusterConnectionImpl.TopologyScanner.compareTCs(netty4,
netty4));
+
+ assertFalse(ClusterConnectionImpl.TopologyScanner.compareTCs(netty1,
netty3));
+ assertFalse(ClusterConnectionImpl.TopologyScanner.compareTCs(netty1,
netty4));
+ assertFalse(ClusterConnectionImpl.TopologyScanner.compareTCs(netty2,
netty3));
+ assertFalse(ClusterConnectionImpl.TopologyScanner.compareTCs(netty2,
netty4));
+ assertFalse(ClusterConnectionImpl.TopologyScanner.compareTCs(netty3,
netty4));
+ }
+
static final class MockServer extends ActiveMQServerImpl {
@Override
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 03d54aeacc..be6d3684af 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -440,6 +440,7 @@
<connector-ref>connector1</connector-ref>
<connector-ref>connector2</connector-ref>
</static-connectors>
+ <topology-scanner-attempts>-1</topology-scanner-attempts>
</cluster-connection>
<cluster-connection name="cluster-connection2">
<discovery-group-ref discovery-group-name="dg1"/>
@@ -452,6 +453,7 @@
<message-load-balancing>STRICT</message-load-balancing>
<max-hops>2</max-hops>
<producer-window-size>333</producer-window-size>
+ <topology-scanner-attempts>30</topology-scanner-attempts>
</cluster-connection>
<cluster-connection name="cluster-connection3">
<connector-ref>connector2</connector-ref>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index 33e8f8de60..fd426361a2 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -328,6 +328,7 @@
<connector-ref>connector1</connector-ref>
<connector-ref>connector2</connector-ref>
</static-connectors>
+ <topology-scanner-attempts>-1</topology-scanner-attempts>
</cluster-connection>
<cluster-connection name="cluster-connection2">
<address>queues2</address>
@@ -340,6 +341,7 @@
<producer-window-size>333</producer-window-size>
<call-failover-timeout>456</call-failover-timeout>
<discovery-group-ref discovery-group-name="dg1"/>
+ <topology-scanner-attempts>30</topology-scanner-attempts>
</cluster-connection>
<cluster-connection name="cluster-connection3">
<connector-ref>connector2</connector-ref>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-cluster-connections.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-cluster-connections.xml
index b4fe4468a6..59912341d6 100644
---
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-cluster-connections.xml
+++
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-cluster-connections.xml
@@ -35,6 +35,7 @@
<connector-ref>connector1</connector-ref>
<connector-ref>connector2</connector-ref>
</static-connectors>
+ <topology-scanner-attempts>-1</topology-scanner-attempts>
</cluster-connection>
<cluster-connection name="cluster-connection2">
<address>queues2</address>
@@ -47,6 +48,7 @@
<producer-window-size>333</producer-window-size>
<call-failover-timeout>456</call-failover-timeout>
<discovery-group-ref discovery-group-name="dg1"/>
+ <topology-scanner-attempts>30</topology-scanner-attempts>
</cluster-connection>
<cluster-connection name="cluster-connection3">
<connector-ref>connector2</connector-ref>
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterTest.java
index 5181233883..6226b2f9df 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterTest.java
@@ -19,13 +19,28 @@ package
org.apache.activemq.artemis.tests.integration.cluster.distribution;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
+import
org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
+import
org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import
org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
+import
org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
+import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.Connector;
+import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.BeforeEach;
@@ -1396,24 +1411,42 @@ public class SymmetricClusterTest extends
ClusterTestBase {
@Test
public void testStartStopServers() throws Exception {
- doTestStartStopServers();
+ doTestStartStopServers(false);
}
- protected void validateTopologSize(int expectedSize, int...
serverParameters) throws Exception {
+ @Test
+ public void testStartStopServersWithPartition() throws Exception {
+ doTestStartStopServers(true);
+ }
+
+ protected void validateTopologySize(int expectedSize, int...
serverParameters) throws Exception {
for (int s : serverParameters) {
logger.debug("Checking topology size on node {}, expecting it to be
{}", s, expectedSize);
assertNotNull(servers[s], "Server[" + s + "] is null");
for (ClusterConnection c :
servers[s].getClusterManager().getClusterConnections()) {
- Wait.assertEquals(expectedSize, () ->
c.getTopology().getMembers().size(), 5000);
+ Wait.assertEquals(expectedSize, () ->
c.getTopology().getMembers().size(), 5000, Wait.SLEEP_MILLIS, () -> {
+ Collection<TopologyMemberImpl> members =
c.getTopology().getMembers();
+ return "SRV[" + s + "]/CC[" + c.getName() + "] has " +
members.size() + "/" + expectedSize + " members:\n" +
+ String.join("\n", members.stream().map(topologyMember ->
topologyMember.toString()).collect(Collectors.toList()));
+ });
}
}
}
- public void doTestStartStopServers() throws Exception {
+ public void doTestStartStopServers(boolean withPartition) throws Exception {
setupCluster();
+ if (withPartition) {
+ setupProxy();
+ enablePartition();
+
+ for (int node = 0; node < 5; node++) {
+
getServer(node).getConfiguration().getClusterConfigurations().get(0).setTopologyScannerAttempts(-1);
+ }
+ }
+
startServers();
setupSessionFactory(0, isNetty());
@@ -1422,7 +1455,22 @@ public class SymmetricClusterTest extends
ClusterTestBase {
setupSessionFactory(3, isNetty());
setupSessionFactory(4, isNetty());
- validateTopologSize(5, 0, 1, 2, 3, 4);
+ if (withPartition) {
+ validateTopologySize(3, 0, 1, 2);
+ validateTopologySize(2, 3, 4);
+
+ disablePartition();
+ }
+
+ validateTopologySize(5, 0, 1, 2, 3, 4);
+
+ if (withPartition) {
+ for (int node = 0; node < 5; node++) {
+ final int serverNode = node;
+ Wait.assertTrue(() ->
!((ClusterConnectionImpl)getServer(serverNode).getClusterManager().
+ getClusterConnection("cluster" +
serverNode)).getTopologyScanner().isRunning());
+ }
+ }
createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue1", null, false);
@@ -1535,16 +1583,16 @@ public class SymmetricClusterTest extends
ClusterTestBase {
stopServers(0, 3);
- validateTopologSize(3, 1, 2, 4);
+ validateTopologySize(3, 1, 2, 4);
startServers(3, 0);
- validateTopologSize(5, 0, 1, 2, 3, 4);
+ validateTopologySize(5, 0, 1, 2, 3, 4);
setupSessionFactory(0, isNetty());
setupSessionFactory(3, isNetty());
- validateTopologSize(5, 0, 1, 2, 3, 4);
+ validateTopologySize(5, 0, 1, 2, 3, 4);
createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(3, "queues.testaddress", "queue3", null, false);
@@ -2010,4 +2058,141 @@ public class SymmetricClusterTest extends
ClusterTestBase {
protected boolean isFileStorage() {
return false;
}
+
+ protected void setupProxy() {
+ setupProxy(0);
+ setupProxy(1);
+ setupProxy(2);
+ setupProxy(3);
+ setupProxy(4);
+ }
+
+ protected void setupProxy(final int nodeId) {
+
getServer(nodeId).getConfiguration().getConnectorConfigurations().forEach((s,
transportConfiguration) -> {
+ transportConfiguration.getExtraParams().put("proxyNodeId", nodeId);
+ if (isNetty()) {
+
transportConfiguration.setFactoryClassName(NettyProxyConnectorFactory.class.getName());
+ } else {
+
transportConfiguration.setFactoryClassName(InVMProxyConnectorFactory.class.getName());
+ }
+ });
+ }
+
+ protected void enablePartition() {
+ ProxyConnectorFactory.interceptor = (nodeId, configuration) -> {
+ int partitionId = (nodeId % 5) / 3;
+ int targetPartitionId;
+
+ if (isNetty()) {
+ int targetPort =
(int)configuration.get(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME);
+ targetPartitionId = ((targetPort -
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT)
% 5) / 3;
+ } else {
+ int targetServerId =
(int)configuration.get(org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME);
+ targetPartitionId = (targetServerId % 5) / 3;
+ }
+
+ return partitionId == targetPartitionId;
+ };
+ }
+
+ protected void disablePartition() {
+ ProxyConnectorFactory.interceptor = null;
+ }
+
+ public interface ProxyConnectorInterceptor {
+ boolean allowConnection(int nodeId, Map<String, Object> configuration);
+ }
+
+ public static class InVMProxyConnectorFactory extends ProxyConnectorFactory
{
+ public InVMProxyConnectorFactory() {
+ super(new InVMConnectorFactory());
+ }
+ }
+
+ public static class NettyProxyConnectorFactory extends
ProxyConnectorFactory {
+ public NettyProxyConnectorFactory() {
+ super(new NettyConnectorFactory());
+ }
+ }
+
+ public abstract static class ProxyConnectorFactory implements
ConnectorFactory {
+ public static volatile ProxyConnectorInterceptor interceptor;
+
+ private ConnectorFactory rawConnectorFactory;
+
+ public ProxyConnectorFactory(ConnectorFactory rawConnectorFactory) {
+ this.rawConnectorFactory = rawConnectorFactory;
+ }
+
+ @Override
+ public Map<String, Object> getDefaults() {
+ return rawConnectorFactory.getDefaults();
+ }
+
+ @Override
+ public Connector createConnector(Map<String, Object> configuration,
+ BufferHandler handler,
+ ClientConnectionLifeCycleListener listener,
+ Executor closeExecutor,
+ Executor threadPool,
+ ScheduledExecutorService scheduledThreadPool,
+ ClientProtocolManager protocolManager) {
+
+ return new ProxyConnector(rawConnectorFactory.createConnector(
+ configuration,
+ handler,
+ listener,
+ closeExecutor,
+ threadPool,
+ scheduledThreadPool,
+ protocolManager), configuration);
+ }
+
+ @Override
+ public boolean isReliable() {
+ return rawConnectorFactory.isReliable();
+ }
+
+ private class ProxyConnector implements Connector {
+ private Connector rawConnector;
+
+ private Map<String, Object> configuration;
+
+ ProxyConnector(Connector rawConnector, Map<String, Object>
configuration) {
+ this.rawConnector = rawConnector;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void start() {
+ rawConnector.start();
+ }
+
+ @Override
+ public void close() {
+ rawConnector.close();
+ }
+
+ @Override
+ public boolean isStarted() {
+ return rawConnector.isStarted();
+ }
+
+ @Override
+ public Connection createConnection() {
+
+ ProxyConnectorInterceptor interceptor =
ProxyConnectorFactory.interceptor;
+ if (interceptor == null ||
interceptor.allowConnection((int)configuration.get("proxyNodeId"),
configuration)) {
+ return rawConnector.createConnection();
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean isEquivalent(Map<String, Object> configuration) {
+ return rawConnector.isEquivalent(configuration);
+ }
+ }
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
index 6db7332038..b423cce67d 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
@@ -490,4 +490,14 @@ public class SymmetricClusterWithBackupTest extends
SymmetricClusterTest {
stopServers(5, 6, 7, 8, 9, 0, 1, 2, 3, 4);
}
+ @Override
+ protected void setupProxy() {
+ super.setupProxy();
+
+ setupProxy(5);
+ setupProxy(6);
+ setupProxy(7);
+ setupProxy(8);
+ setupProxy(9);
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
index ab4b5133f9..290c4461c2 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
@@ -16,6 +16,14 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
+import java.io.EOFException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.BroadcastEndpoint;
+import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
+import org.apache.activemq.artemis.api.core.SimpleString;
import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.jupiter.api.Test;
@@ -35,7 +43,14 @@ public class SymmetricClusterWithDiscoveryTest extends
SymmetricClusterTest {
@Test
public void testStartStopServers() throws Exception {
// When using discovery starting and stopping it too fast could have a
race condition with UDP
- doTestStartStopServers();
+ doTestStartStopServers(false);
+ }
+
+ @Override
+ @Test
+ public void testStartStopServersWithPartition() throws Exception {
+ // When using discovery starting and stopping it too fast could have a
race condition with UDP
+ doTestStartStopServers(true);
}
@Override
@@ -71,6 +86,116 @@ public class SymmetricClusterWithDiscoveryTest extends
SymmetricClusterTest {
*/
@Test
public void testStartStopServersWithPauseBeforeRestarting() throws
Exception {
- doTestStartStopServers();
+ doTestStartStopServers(false);
+ }
+
+ @Override
+ protected void setupProxy(int nodeId) {
+
getServer(nodeId).getConfiguration().getDiscoveryGroupConfigurations().get("dg1").
+ setBroadcastEndpointFactory(new
ProxyBroadcastEndpointFactory(nodeId, getServer(nodeId).
+
getConfiguration().getDiscoveryGroupConfigurations().get("dg1").getBroadcastEndpointFactory()));
+ }
+
+ @Override
+ protected void enablePartition() {
+ super.enablePartition();
+
+ ProxyBroadcastEndpointFactory.interceptor = (nodeId, data) -> {
+ ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(data);
+ SimpleString originatingNodeID = SimpleString.of(buffer.readString());
+
+ for (int i = 0; i < servers.length; i++) {
+ if (servers[i] != null &&
originatingNodeID.equals(servers[i].getNodeID())) {
+ int partitionId = (i % 5) / 3;
+ int targetPartitionId = (nodeId % 5) / 3;
+
+ return partitionId == targetPartitionId;
+ }
+ }
+
+ return false;
+ };
+ }
+
+ @Override
+ protected void disablePartition() {
+ super.disablePartition();
+
+ ProxyBroadcastEndpointFactory.interceptor = null;
+ }
+
+ public interface ProxyBroadcastEndpointInterceptor {
+ boolean allowBroadcast(int nodeId, byte[] data);
+ }
+
+ public class ProxyBroadcastEndpointFactory implements
BroadcastEndpointFactory {
+
+ public static volatile ProxyBroadcastEndpointInterceptor interceptor;
+
+ private int nodeId;
+ private final BroadcastEndpointFactory rawBroadcastEndpointFactory;
+
+ public ProxyBroadcastEndpointFactory(int nodeId,
BroadcastEndpointFactory rawBroadcastEndpointFactory) {
+ this.nodeId = nodeId;
+ this.rawBroadcastEndpointFactory = rawBroadcastEndpointFactory;
+ }
+
+ @Override
+ public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
+ return new ProxyBroadcastEndpoint(nodeId,
rawBroadcastEndpointFactory.createBroadcastEndpoint());
+ }
+
+ private class ProxyBroadcastEndpoint implements BroadcastEndpoint {
+
+ private int nodeId;
+ private volatile boolean open;
+ private final BroadcastEndpoint rawBroadcastEndpoint;
+
+ ProxyBroadcastEndpoint(int nodeId, BroadcastEndpoint
rawBroadcastEndpoint) {
+ this.nodeId = nodeId;
+ this.rawBroadcastEndpoint = rawBroadcastEndpoint;
+ }
+
+ @Override
+ public void openClient() throws Exception {
+ open = true;
+ rawBroadcastEndpoint.openClient();
+ }
+
+ @Override
+ public void openBroadcaster() throws Exception {
+ open = true;
+ rawBroadcastEndpoint.openBroadcaster();
+ }
+
+ @Override
+ public void close(boolean isBroadcast) throws Exception {
+ open = false;
+ rawBroadcastEndpoint.close(isBroadcast);
+ }
+
+ @Override
+ public void broadcast(byte[] data) throws Exception {
+ rawBroadcastEndpoint.broadcast(data);
+ }
+
+ @Override
+ public byte[] receiveBroadcast() throws Exception {
+ return receiveBroadcast(Long.MAX_VALUE, TimeUnit.DAYS);
+ }
+
+ @Override
+ public byte[] receiveBroadcast(long time, TimeUnit unit) throws
Exception {
+ while (open) {
+ byte[] data = rawBroadcastEndpoint.receiveBroadcast(time, unit);
+
+ ProxyBroadcastEndpointInterceptor interceptor =
ProxyBroadcastEndpointFactory.interceptor;
+ if (interceptor == null || interceptor.allowBroadcast(nodeId,
data)) {
+ return data;
+ }
+ }
+ throw new EOFException();
+ }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact