This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch 2.19.x in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 696d26967d671bb2793d3d5ea920da1bb323b397 Author: Domenico Francesco Bruscino <[email protected]> AuthorDate: Mon Oct 18 16:48:59 2021 +0200 ARTEMIS-3495 Fix backup cluster controller connection loops Skip backup connector equivalent to cluster connector for cluster connections. (cherry picked from commit dca3facb55f6d848988d5a25b5640b92bad8b01d) --- .../core/client/impl/ClientSessionFactoryImpl.java | 4 +- .../core/client/impl/ServerLocatorImpl.java | 5 ++ .../artemis/core/server/cluster/BackupManager.java | 7 +++ .../core/server/cluster/ClusterController.java | 10 +++- .../core/server/cluster/ClusterManager.java | 2 +- .../server/cluster/impl/ClusterConnectionImpl.java | 8 +++ .../integration/cluster/failover/FailoverTest.java | 57 ++++++++++++++++++++++ 7 files changed, 89 insertions(+), 4 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 80a2428..48e70ff 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -289,7 +289,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C localConnector = connectorFactory.createConnector(currentConnectorConfig.getParams(), new DelegatingBufferHandler(), this, closeExecutor, threadPool, scheduledThreadPool, clientProtocolManager); } - if (localConnector.isEquivalent(live.getParams()) && backUp != null && !localConnector.isEquivalent(backUp.getParams())) { + if (localConnector.isEquivalent(live.getParams()) && backUp != null && !localConnector.isEquivalent(backUp.getParams()) + // check if a server is trying to set its cluster connector config as backup connector config + && !(serverLocator.getClusterTransportConfiguration() != null && serverLocator.getClusterTransportConfiguration().isSameParams(backUp))) { if (logger.isDebugEnabled()) { logger.debug("Setting up backup config = " + backUp + " for live = " + live); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index 363849f..62fbef1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -170,6 +170,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return discoveryGroup; } + /** For tests only */ + public Set<ClientSessionFactoryInternal> getFactories() { + return factories; + } + private final Exception traceException = new Exception(); private ServerLocatorConfig config = new ServerLocatorConfig(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java index 62b9563..1bf9919 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java @@ -195,6 +195,12 @@ public class BackupManager implements ActiveMQComponent { private volatile boolean announcingBackup; private volatile boolean backupAnnounced = false; + + public TransportConfiguration getConnector() { + return connector; + } + + @Override public String toString() { return "BackupConnector{" + "name='" + config.getName() + '\'' + ", connector=" + connector + '}'; @@ -363,6 +369,7 @@ public class BackupManager implements ActiveMQComponent { } ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs); locator.setClusterConnection(true); + locator.setClusterTransportConfiguration(getConnector()); locator.setRetryInterval(config.getRetryInterval()); locator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod()); locator.setConnectionTTL(config.getConnectionTTL()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java index ec9f153..6547613 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java @@ -80,6 +80,11 @@ public class ClusterController implements ActiveMQComponent { private boolean started; private SimpleString replicatedClusterName; + /** For tests only */ + public ServerLocator getDefaultLocator() { + return defaultLocator; + } + public ClusterController(ActiveMQServer server, ScheduledExecutorService scheduledExecutor, boolean useQuorumManager) { @@ -207,9 +212,10 @@ public class ClusterController implements ActiveMQComponent { */ public void addClusterConnection(SimpleString name, TransportConfiguration[] tcConfigs, - ClusterConnectionConfiguration config) { + ClusterConnectionConfiguration config, + TransportConfiguration connector) { ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(tcConfigs); - configAndAdd(name, serverLocator, config, null); + configAndAdd(name, serverLocator, config, connector); } private void configAndAdd(SimpleString name, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java index 44a81e4..a617e23 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java @@ -639,7 +639,7 @@ public class ClusterManager implements ActiveMQComponent { clusterConnection = new ClusterConnectionImpl(this, tcConfigs, connector, new SimpleString(config.getName()), new SimpleString(config.getAddress()), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailoverTimeout(), config.isDuplicat [...] - clusterController.addClusterConnection(clusterConnection.getName(), tcConfigs, config); + clusterController.addClusterConnection(clusterConnection.getName(), tcConfigs, config, connector); } if (defaultClusterConnection == null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 2e3f8e3..8820885 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -179,6 +179,13 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn private boolean splitBrainDetection; + + /** For tests only */ + public ServerLocatorInternal getServerLocator() { + return serverLocator; + } + + public ClusterConnectionImpl(final ClusterManager manager, final TransportConfiguration[] staticTranspConfigs, final TransportConfiguration connector, @@ -1553,6 +1560,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn } ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs); locator.setClusterConnection(true); + locator.setClusterTransportConfiguration(connector); return locator; } return null; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java index bc6e151..73b7bf6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java @@ -46,12 +46,14 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.FailoverEventType; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext; @@ -59,6 +61,8 @@ import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; +import org.apache.activemq.artemis.core.server.cluster.BackupManager; +import org.apache.activemq.artemis.core.server.cluster.ClusterController; import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.ReplicaPolicy; @@ -67,7 +71,9 @@ import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationBackupPolic import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationPrimaryPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy; +import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; import org.apache.activemq.artemis.core.server.files.FileMoveManager; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; @@ -80,6 +86,7 @@ import org.apache.activemq.artemis.utils.RetryRule; import org.apache.activemq.artemis.utils.Wait; import org.jboss.logging.Logger; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -2459,6 +2466,56 @@ public class FailoverTest extends FailoverTestBase { Assert.assertEquals("message0", cm.getBodyBuffer().readString()); } + @Test(timeout = 120000) + public void testBackupConnections() throws Exception { + Assume.assumeTrue(backupServer.getServer().getHAPolicy().isBackup()); + + createSessionFactory(); + + CountDownLatch latch = new CountDownLatch(1); + sf.addFailoverListener(eventType -> { + if (eventType == FailoverEventType.FAILOVER_COMPLETED) { + latch.countDown(); + } + }); + + BackupManager backupManager = ((ActiveMQServerImpl)backupServer.getServer()).getBackupManager(); + ClusterController backupClusterController = backupServer.getServer().getClusterManager().getClusterController(); + ClusterConnectionImpl backupClusterConnection = (ClusterConnectionImpl)backupServer.getServer().getClusterManager().getClusterConnections().stream().findFirst().get(); + + for (BackupManager.BackupConnector backupConnector : backupManager.getBackupConnectors()) { + for (ClientSessionFactoryInternal factory : ((ServerLocatorImpl)backupConnector.getBackupServerLocator()).getFactories()) { + Assert.assertNotNull(factory.getConnection()); + } + } + + for (ClientSessionFactoryInternal factory : ((ServerLocatorImpl)backupClusterController.getDefaultLocator()).getFactories()) { + Assert.assertNotNull(factory.getConnection()); + } + + Assert.assertNull(backupClusterConnection.getServerLocator()); + + Assert.assertNotNull(sf.getConnection()); + + crash(); + + latch.await(); + + for (BackupManager.BackupConnector backupConnector : backupManager.getBackupConnectors()) { + Assert.assertNull(backupConnector.getBackupServerLocator()); + } + + for (ClientSessionFactoryInternal factory : ((ServerLocatorImpl)backupServer.getServer().getClusterManager().getClusterController().getDefaultLocator()).getFactories()) { + Assert.assertNull(factory.getConnection()); + } + + for (ClientSessionFactoryInternal factory : ((ServerLocatorImpl)backupClusterConnection.getServerLocator()).getFactories()) { + Assert.assertNull(factory.getConnection()); + } + + Assert.assertNotNull(sf.getConnection()); + } + // Package protected --------------------------------------------- // Protected -----------------------------------------------------
