This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 6c7231928ffdd1b057c51a306ba55649446d74a1 Author: franz1981 <[email protected]> AuthorDate: Wed Feb 24 17:43:51 2021 +0100 ARTEMIS-3138 Shared Nothing Live broker shouldn't try to connect to itself (2) --- .../core/client/impl/ServerLocatorImpl.java | 12 ++++++----- .../core/server/cluster/ClusterController.java | 23 +++++++++++----------- .../core/server/cluster/ClusterManager.java | 2 +- 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index 5a436ab..2ac0fd3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -1546,20 +1546,22 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (receivedTopology) { return; } - TransportConfiguration[] newInitialconnectors = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, newConnectors.size()); - int count = 0; - for (DiscoveryEntry entry : newConnectors) { - newInitialconnectors[count++] = entry.getConnector(); + final List<TransportConfiguration> newInitialconnectors = new ArrayList<>(newConnectors.size()); + for (DiscoveryEntry entry : newConnectors) { if (ha && topology.getMember(entry.getNodeID()) == null) { TopologyMemberImpl member = new TopologyMemberImpl(entry.getNodeID(), null, null, entry.getConnector(), null); // on this case we set it as zero as any update coming from server should be accepted topology.updateMember(0, entry.getNodeID(), member); } + // ignore its own transport connector + if (!entry.getConnector().equals(clusterTransportConfiguration)) { + newInitialconnectors.add(entry.getConnector()); + } } - this.initialConnectors = newInitialconnectors.length == 0 ? null : newInitialconnectors; + this.initialConnectors = newInitialconnectors.toArray(new TransportConfiguration[newInitialconnectors.size()]); if (clusterConnection && !receivedTopology && this.getNumInitialConnectors() > 0) { // The node is alone in the cluster. We create a connection to the new node diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java index 6322db7..6ef9f26 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java @@ -158,15 +158,17 @@ public class ClusterController implements ActiveMQComponent { /** * add a locator for a cluster connection. * - * @param name the cluster connection name - * @param dg the discovery group to use - * @param config the cluster connection config + * @param name the cluster connection name + * @param dg the discovery group to use + * @param config the cluster connection config + * @param connector the cluster connector configuration */ public void addClusterConnection(SimpleString name, DiscoveryGroupConfiguration dg, - ClusterConnectionConfiguration config) { + ClusterConnectionConfiguration config, + TransportConfiguration connector) { ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(dg); - configAndAdd(name, serverLocator, config); + configAndAdd(name, serverLocator, config, connector); } /** @@ -179,12 +181,13 @@ public class ClusterController implements ActiveMQComponent { TransportConfiguration[] tcConfigs, ClusterConnectionConfiguration config) { ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(tcConfigs); - configAndAdd(name, serverLocator, config); + configAndAdd(name, serverLocator, config, null); } private void configAndAdd(SimpleString name, ServerLocatorInternal serverLocator, - ClusterConnectionConfiguration config) { + ClusterConnectionConfiguration config, + TransportConfiguration connector) { serverLocator.setConnectionTTL(config.getConnectionTTL()); serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod()); //if the cluster isn't available we want to hang around until it is @@ -198,10 +201,8 @@ public class ClusterController implements ActiveMQComponent { //this is used for replication so need to use the server packet decoder serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, server.getStorageManager())); serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool()); - SimpleString nodeID = server.getNodeID(); - if (nodeID != null) { - // this is used to allow a live server to ignore it's same connector ref - serverLocator.setNodeID(nodeID.toString()); + if (connector != null) { + serverLocator.setClusterTransportConfiguration(connector); } try { serverLocator.initialize(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java index 522c2d2..ac48302 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java @@ -609,7 +609,7 @@ public final class ClusterManager implements ActiveMQComponent { clusterConnection = new ClusterConnectionImpl(this, dg, connector, new SimpleString(config.getName()), new SimpleString(config.getAddress() != null ? config.getAddress() : ""), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailover [...] - clusterController.addClusterConnection(clusterConnection.getName(), dg, config); + clusterController.addClusterConnection(clusterConnection.getName(), dg, config, connector); } else { TransportConfiguration[] tcConfigs = config.getTransportConfigurations(configuration);
