This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.19.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 696d26967d671bb2793d3d5ea920da1bb323b397
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Mon Oct 18 16:48:59 2021 +0200

    ARTEMIS-3495 Fix backup cluster controller connection loops
    
    Skip backup connector equivalent to cluster connector for cluster 
connections.
    
    (cherry picked from commit dca3facb55f6d848988d5a25b5640b92bad8b01d)
---
 .../core/client/impl/ClientSessionFactoryImpl.java |  4 +-
 .../core/client/impl/ServerLocatorImpl.java        |  5 ++
 .../artemis/core/server/cluster/BackupManager.java |  7 +++
 .../core/server/cluster/ClusterController.java     | 10 +++-
 .../core/server/cluster/ClusterManager.java        |  2 +-
 .../server/cluster/impl/ClusterConnectionImpl.java |  8 +++
 .../integration/cluster/failover/FailoverTest.java | 57 ++++++++++++++++++++++
 7 files changed, 89 insertions(+), 4 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 80a2428..48e70ff 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -289,7 +289,9 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
          localConnector = 
connectorFactory.createConnector(currentConnectorConfig.getParams(), new 
DelegatingBufferHandler(), this, closeExecutor, threadPool, 
scheduledThreadPool, clientProtocolManager);
       }
 
-      if (localConnector.isEquivalent(live.getParams()) && backUp != null && 
!localConnector.isEquivalent(backUp.getParams())) {
+      if (localConnector.isEquivalent(live.getParams()) && backUp != null && 
!localConnector.isEquivalent(backUp.getParams())
+         // check if a server is trying to set its cluster connector config as 
backup connector config
+         && !(serverLocator.getClusterTransportConfiguration() != null && 
serverLocator.getClusterTransportConfiguration().isSameParams(backUp))) {
          if (logger.isDebugEnabled()) {
             logger.debug("Setting up backup config = " + backUp + " for live = 
" + live);
          }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index 363849f..62fbef1 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -170,6 +170,11 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
       return discoveryGroup;
    }
 
+   /** For tests only */
+   public Set<ClientSessionFactoryInternal> getFactories() {
+      return factories;
+   }
+
    private final Exception traceException = new Exception();
 
    private ServerLocatorConfig config = new ServerLocatorConfig();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
index 62b9563..1bf9919 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
@@ -195,6 +195,12 @@ public class BackupManager implements ActiveMQComponent {
       private volatile boolean announcingBackup;
       private volatile boolean backupAnnounced = false;
 
+
+      public TransportConfiguration getConnector() {
+         return connector;
+      }
+
+
       @Override
       public String toString() {
          return "BackupConnector{" + "name='" + config.getName() + '\'' + ", 
connector=" + connector + '}';
@@ -363,6 +369,7 @@ public class BackupManager implements ActiveMQComponent {
             }
             ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, 
tcConfigs);
             locator.setClusterConnection(true);
+            locator.setClusterTransportConfiguration(getConnector());
             locator.setRetryInterval(config.getRetryInterval());
             
locator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
             locator.setConnectionTTL(config.getConnectionTTL());
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
index ec9f153..6547613 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
@@ -80,6 +80,11 @@ public class ClusterController implements ActiveMQComponent {
    private boolean started;
    private SimpleString replicatedClusterName;
 
+   /** For tests only */
+   public ServerLocator getDefaultLocator() {
+      return defaultLocator;
+   }
+
    public ClusterController(ActiveMQServer server,
                             ScheduledExecutorService scheduledExecutor,
                             boolean useQuorumManager) {
@@ -207,9 +212,10 @@ public class ClusterController implements 
ActiveMQComponent {
     */
    public void addClusterConnection(SimpleString name,
                                     TransportConfiguration[] tcConfigs,
-                                    ClusterConnectionConfiguration config) {
+                                    ClusterConnectionConfiguration config,
+                                    TransportConfiguration connector) {
       ServerLocatorImpl serverLocator = (ServerLocatorImpl) 
ActiveMQClient.createServerLocatorWithHA(tcConfigs);
-      configAndAdd(name, serverLocator, config, null);
+      configAndAdd(name, serverLocator, config, connector);
    }
 
    private void configAndAdd(SimpleString name,
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
index 44a81e4..a617e23 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
@@ -639,7 +639,7 @@ public class ClusterManager implements ActiveMQComponent {
 
          clusterConnection = new ClusterConnectionImpl(this, tcConfigs, 
connector, new SimpleString(config.getName()), new 
SimpleString(config.getAddress()), config.getMinLargeMessageSize(), 
config.getClientFailureCheckPeriod(), config.getConnectionTTL(), 
config.getRetryInterval(), config.getRetryIntervalMultiplier(), 
config.getMaxRetryInterval(), config.getInitialConnectAttempts(), 
config.getReconnectAttempts(), config.getCallTimeout(), 
config.getCallFailoverTimeout(), config.isDuplicat [...]
 
-         clusterController.addClusterConnection(clusterConnection.getName(), 
tcConfigs, config);
+         clusterController.addClusterConnection(clusterConnection.getName(), 
tcConfigs, config, connector);
       }
 
       if (defaultClusterConnection == null) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 2e3f8e3..8820885 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -179,6 +179,13 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
 
    private boolean splitBrainDetection;
 
+
+   /** For tests only */
+   public ServerLocatorInternal getServerLocator() {
+      return serverLocator;
+   }
+
+
    public ClusterConnectionImpl(final ClusterManager manager,
                                 final TransportConfiguration[] 
staticTranspConfigs,
                                 final TransportConfiguration connector,
@@ -1553,6 +1560,7 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
             }
             ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, 
tcConfigs);
             locator.setClusterConnection(true);
+            locator.setClusterTransportConfiguration(connector);
             return locator;
          }
          return null;
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
index bc6e151..73b7bf6 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
@@ -46,12 +46,14 @@ import 
org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.FailoverEventType;
 import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
 import 
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
 import org.apache.activemq.artemis.core.protocol.core.Channel;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext;
@@ -59,6 +61,8 @@ import 
org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
+import org.apache.activemq.artemis.core.server.cluster.BackupManager;
+import org.apache.activemq.artemis.core.server.cluster.ClusterController;
 import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy;
 import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
 import org.apache.activemq.artemis.core.server.cluster.ha.ReplicaPolicy;
@@ -67,7 +71,9 @@ import 
org.apache.activemq.artemis.core.server.cluster.ha.ReplicationBackupPolic
 import 
org.apache.activemq.artemis.core.server.cluster.ha.ReplicationPrimaryPolicy;
 import 
org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy;
 import 
org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
 import org.apache.activemq.artemis.core.server.files.FileMoveManager;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
@@ -80,6 +86,7 @@ import org.apache.activemq.artemis.utils.RetryRule;
 import org.apache.activemq.artemis.utils.Wait;
 import org.jboss.logging.Logger;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -2459,6 +2466,56 @@ public class FailoverTest extends FailoverTestBase {
       Assert.assertEquals("message0", cm.getBodyBuffer().readString());
    }
 
+   @Test(timeout = 120000)
+   public void testBackupConnections() throws Exception {
+      Assume.assumeTrue(backupServer.getServer().getHAPolicy().isBackup());
+
+      createSessionFactory();
+
+      CountDownLatch latch = new CountDownLatch(1);
+      sf.addFailoverListener(eventType -> {
+         if (eventType == FailoverEventType.FAILOVER_COMPLETED) {
+            latch.countDown();
+         }
+      });
+
+      BackupManager backupManager = 
((ActiveMQServerImpl)backupServer.getServer()).getBackupManager();
+      ClusterController backupClusterController = 
backupServer.getServer().getClusterManager().getClusterController();
+      ClusterConnectionImpl backupClusterConnection = 
(ClusterConnectionImpl)backupServer.getServer().getClusterManager().getClusterConnections().stream().findFirst().get();
+
+      for (BackupManager.BackupConnector backupConnector : 
backupManager.getBackupConnectors()) {
+         for (ClientSessionFactoryInternal factory : 
((ServerLocatorImpl)backupConnector.getBackupServerLocator()).getFactories()) {
+            Assert.assertNotNull(factory.getConnection());
+         }
+      }
+
+      for (ClientSessionFactoryInternal factory : 
((ServerLocatorImpl)backupClusterController.getDefaultLocator()).getFactories())
 {
+         Assert.assertNotNull(factory.getConnection());
+      }
+
+      Assert.assertNull(backupClusterConnection.getServerLocator());
+
+      Assert.assertNotNull(sf.getConnection());
+
+      crash();
+
+      latch.await();
+
+      for (BackupManager.BackupConnector backupConnector : 
backupManager.getBackupConnectors()) {
+         Assert.assertNull(backupConnector.getBackupServerLocator());
+      }
+
+      for (ClientSessionFactoryInternal factory : 
((ServerLocatorImpl)backupServer.getServer().getClusterManager().getClusterController().getDefaultLocator()).getFactories())
 {
+         Assert.assertNull(factory.getConnection());
+      }
+
+      for (ClientSessionFactoryInternal factory : 
((ServerLocatorImpl)backupClusterConnection.getServerLocator()).getFactories()) 
{
+         Assert.assertNull(factory.getConnection());
+      }
+
+      Assert.assertNotNull(sf.getConnection());
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Reply via email to