This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit f83a8ee138938e0ba1d37754988978a741528c4b
Author: Lokesh Khurana <khuranalokes...@gmail.com>
AuthorDate: Thu May 1 14:43:17 2025 -0700

    PHOENIX-7586 :- Handle Role transitions for ActiveToStanby role in Failover 
HAPolicy (#2128)
---
 .../phoenix/jdbc/HighAvailabilityPolicy.java       |  32 +++++-
 .../phoenix/jdbc/FailoverPhoenixConnection2IT.java | 110 +++++++++++++++++++++
 2 files changed, 138 insertions(+), 4 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
index 64c60a4a71..51b41a2822 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
@@ -19,6 +19,7 @@
 package org.apache.phoenix.jdbc;
 
 import static org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole.ACTIVE;
+import static 
org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY;
 import static org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole.STANDBY;
 
 import java.sql.Connection;
@@ -47,6 +48,25 @@ public enum HighAvailabilityPolicy {
             return new FailoverPhoenixConnection(context);
         }
 
+        /**
+         * Cluster Role Transitions for Failover High Availability Policy, 
Here we are trying to
+         * close connections late to allow existing reads to continue during 
Failover.
+         * ACTIVE --> ACTIVE_TO_STANDBY (Doing Nothing as we are in process of 
moving the current
+         *      to STANDBY and at this step we are blocking write to drain 
replication this allows
+         *      us to continue existing reads to continue)
+         * ACTIVE|ACTIVE_TO_STANDBY --> STANDBY (Closing all current 
connections)
+         *
+         * STANDBY --> ACTIVE (Invalidate CQSI as connections has been closed 
and now being cleared)
+         * STANDBY --> ACTIVE_TO_STANDBY (Should not be a case but in case of 
failover Rollback we
+         *      are going back to ACTIVE_TO_STANDBY state invalidating earlier 
here as connections
+         *      are already closed)
+         * ACTIVE_TO_STANDBY --> ACTIVE (Doing nothing as we have already 
invalidated cqsi when we
+         *      transitioned from STANDBY to ACTIVE_TO_STANDBY)
+         * @param haGroup The high availability (HA) group
+         * @param oldRecord The older cluster role record cached in this 
client for the given HA group
+         * @param newRecord New cluster role record read from one ZooKeeper 
cluster znode
+         * @throws SQLException
+         */
         @Override
         void transitClusterRole(HighAvailabilityGroup haGroup, 
ClusterRoleRecord oldRecord,
                 ClusterRoleRecord newRecord) throws SQLException {
@@ -55,16 +75,20 @@ public enum HighAvailabilityPolicy {
                         "Doing nothing for Cluster Role Change");
                 return;
             }
-            if (oldRecord.getRole1() == ACTIVE && newRecord.getRole1() == 
STANDBY) {
+            if ((oldRecord.getRole1() == ACTIVE || oldRecord.getRole1() == 
ACTIVE_TO_STANDBY)
+                    && newRecord.getRole1() == STANDBY) {
                 transitStandby(haGroup, oldRecord.getUrl1(), 
oldRecord.getRegistryType());
             }
-            if (oldRecord.getRole2() == ACTIVE && newRecord.getRole2() == 
STANDBY) {
+            if ((oldRecord.getRole2() == ACTIVE || oldRecord.getRole2() == 
ACTIVE_TO_STANDBY)
+                    && newRecord.getRole2() == STANDBY) {
                 transitStandby(haGroup, oldRecord.getUrl2(), 
oldRecord.getRegistryType());
             }
-            if (oldRecord.getRole1() != ACTIVE && newRecord.getRole1() == 
ACTIVE) {
+            if ((oldRecord.getRole1() != ACTIVE && oldRecord.getRole1() != 
ACTIVE_TO_STANDBY)
+                    && (newRecord.getRole1() == ACTIVE || newRecord.getRole1() 
== ACTIVE_TO_STANDBY)) {
                 transitActive(haGroup, oldRecord.getUrl1(), 
oldRecord.getRegistryType());
             }
-            if (oldRecord.getRole2() != ACTIVE && newRecord.getRole2() == 
ACTIVE) {
+            if ((oldRecord.getRole2() != ACTIVE && oldRecord.getRole2() != 
ACTIVE_TO_STANDBY)
+                    && (newRecord.getRole2() == ACTIVE || newRecord.getRole2() 
== ACTIVE_TO_STANDBY)) {
                 transitActive(haGroup, oldRecord.getUrl2(), 
oldRecord.getRegistryType());
             }
         }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection2IT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection2IT.java
index a93a13c4f6..03411c888c 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection2IT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection2IT.java
@@ -27,9 +27,12 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -60,6 +63,7 @@ import org.slf4j.LoggerFactory;
 public class FailoverPhoenixConnection2IT {
     private static final Logger LOG = 
LoggerFactory.getLogger(FailoverPhoenixConnectionIT.class);
     private static final 
HighAvailabilityTestingUtility.HBaseTestingUtilityPair CLUSTERS = new 
HighAvailabilityTestingUtility.HBaseTestingUtilityPair();
+    private static final long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 1000L;
 
     @Rule
     public final TestName testName = new TestName();
@@ -470,6 +474,112 @@ public class FailoverPhoenixConnection2IT {
         }
     }
 
+    /**
+     * Test connections behaviour when doing 2-step failover.
+     * cluster 1 ( ACTIVE --> ACTIVE_TO_STANDBY --> STANDBY )
+     */
+    @Test(timeout = 300000)
+    public void 
testAllWrappedConnectionsClosedAtRightTimeDuringClusterRoleChange()
+            throws Exception {
+        //Write Some data to ACTIVE cluster
+        Connection connection = createFailoverConnection();
+        Statement statement = connection.createStatement();
+        short numOfRows = 100;
+        for (int i = 0; i < numOfRows; i++) {
+            statement.executeUpdate(String.format("UPSERT INTO %s VALUES(%d, 
1984)", tableName, i));
+        }
+        connection.commit();
+
+        //Creating some connections to ACTIVE cluster
+        short numberOfConnections = 10;
+        //Create FailoverPhoenixConnections with default urls
+        List<Connection> connectionList = new ArrayList<>(numberOfConnections);
+        for (short i = 0; i < numberOfConnections; i++) {
+            connectionList.add(createFailoverConnection());
+        }
+
+        //Transit Role1 ACTIVE --> ACTIVE_TO_STANDBY
+        CLUSTERS.transitClusterRole(haGroup, 
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+                ClusterRoleRecord.ClusterRole.STANDBY);
+        //Connections should be open
+        assertFalse(connection.isClosed());
+        for (short i = 0; i < numberOfConnections; i++) {
+            FailoverPhoenixConnection conn = ((FailoverPhoenixConnection) 
connectionList.get(i));
+            assertFalse(conn.isClosed());
+            assertFalse(conn.getWrappedConnection().isClosed());
+        }
+
+        //Read with the open connection for random id
+        try {
+            ResultSet rs = connection.createStatement().executeQuery(
+                    String.format("SELECT v FROM %s WHERE id = %d", tableName, 
50));
+            assertTrue(rs.next());
+            assertEquals(1984, rs.getInt(1));
+        } catch (Exception e) {
+            fail();
+        }
+
+        //Transit Role1 ACTIVE_TO_STANDBY --> STANDBY
+        CLUSTERS.transitClusterRole(haGroup, 
ClusterRoleRecord.ClusterRole.STANDBY,
+                ClusterRoleRecord.ClusterRole.STANDBY);
+        //Connections should be closed
+        for (short i = 0; i < numberOfConnections; i++) {
+            FailoverPhoenixConnection conn = ((FailoverPhoenixConnection) 
connectionList.get(i));
+            assertFalse(conn.isClosed());
+            assertTrue(conn.getWrappedConnection().isClosed());
+        }
+
+        //Try reading again, but it should throw SQLException
+        try {
+            connection.createStatement().executeQuery(
+                    String.format("SELECT v FROM %s WHERE id = %d", tableName, 
50));
+            fail();
+        } catch (Exception e) {
+            if (e instanceof SQLException) {
+                //Expected as connections should be closed
+            } else {
+                fail();
+            }
+        }
+    }
+
+    /**
+     * Test early rollback of Failover Policy where connections should not be 
affected
+     * ACTIVE --> ACTIVE_TO_STANDBY and then ACTIVE_TO_STANDBY --> ACTIVE
+     */
+    @Test(timeout = 300000)
+    public void testEarlyRollbackHasNoEffectOnFailoverConnections() throws 
Exception {
+        //Creating some connections to ACTIVE cluster
+        short numberOfConnections = 3;
+        //Create FailoverPhoenixConnections with default urls
+        List<Connection> connectionList = new ArrayList<>(numberOfConnections);
+        for (short i = 0; i < numberOfConnections; i++) {
+            connectionList.add(createFailoverConnection());
+        }
+
+        //Transit Role1 ACTIVE --> ACTIVE_TO_STANDBY
+        CLUSTERS.transitClusterRole(haGroup, 
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+                ClusterRoleRecord.ClusterRole.STANDBY);
+        //Connections should be open
+        for (short i = 0; i < numberOfConnections; i++) {
+            FailoverPhoenixConnection conn = ((FailoverPhoenixConnection) 
connectionList.get(i));
+            assertFalse(conn.isClosed());
+            assertFalse(conn.getWrappedConnection().isClosed());
+        }
+
+        //Transit Role1 ACTIVE_TO_STANDBY --> ACTIVE
+        CLUSTERS.transitClusterRole(haGroup, 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                ClusterRoleRecord.ClusterRole.STANDBY);
+        //Connections should still be open
+        for (short i = 0; i < numberOfConnections; i++) {
+            FailoverPhoenixConnection conn = ((FailoverPhoenixConnection) 
connectionList.get(i));
+            assertFalse(conn.isClosed());
+            assertFalse(conn.getWrappedConnection().isClosed());
+            //closing connections
+            conn.close();
+        }
+    }
+
     /**
      * Create a failover connection using {@link #clientProperties}.
      */

Reply via email to