This is an automated email from the ASF dual-hosted git repository. apurtell pushed a commit to branch PHOENIX-7562-feature in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit f83a8ee138938e0ba1d37754988978a741528c4b Author: Lokesh Khurana <khuranalokes...@gmail.com> AuthorDate: Thu May 1 14:43:17 2025 -0700 PHOENIX-7586 :- Handle Role transitions for ActiveToStanby role in Failover HAPolicy (#2128) --- .../phoenix/jdbc/HighAvailabilityPolicy.java | 32 +++++- .../phoenix/jdbc/FailoverPhoenixConnection2IT.java | 110 +++++++++++++++++++++ 2 files changed, 138 insertions(+), 4 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java index 64c60a4a71..51b41a2822 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java @@ -19,6 +19,7 @@ package org.apache.phoenix.jdbc; import static org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole.ACTIVE; +import static org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY; import static org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole.STANDBY; import java.sql.Connection; @@ -47,6 +48,25 @@ public enum HighAvailabilityPolicy { return new FailoverPhoenixConnection(context); } + /** + * Cluster Role Transitions for Failover High Availability Policy, Here we are trying to + * close connections late to allow existing reads to continue during Failover. + * ACTIVE --> ACTIVE_TO_STANDBY (Doing Nothing as we are in process of moving the current + * to STANDBY and at this step we are blocking write to drain replication this allows + * us to continue existing reads to continue) + * ACTIVE|ACTIVE_TO_STANDBY --> STANDBY (Closing all current connections) + * + * STANDBY --> ACTIVE (Invalidate CQSI as connections has been closed and now being cleared) + * STANDBY --> ACTIVE_TO_STANDBY (Should not be a case but in case of failover Rollback we + * are going back to ACTIVE_TO_STANDBY state invalidating earlier here as connections + * are already closed) + * ACTIVE_TO_STANDBY --> ACTIVE (Doing nothing as we have already invalidated cqsi when we + * transitioned from STANDBY to ACTIVE_TO_STANDBY) + * @param haGroup The high availability (HA) group + * @param oldRecord The older cluster role record cached in this client for the given HA group + * @param newRecord New cluster role record read from one ZooKeeper cluster znode + * @throws SQLException + */ @Override void transitClusterRole(HighAvailabilityGroup haGroup, ClusterRoleRecord oldRecord, ClusterRoleRecord newRecord) throws SQLException { @@ -55,16 +75,20 @@ public enum HighAvailabilityPolicy { "Doing nothing for Cluster Role Change"); return; } - if (oldRecord.getRole1() == ACTIVE && newRecord.getRole1() == STANDBY) { + if ((oldRecord.getRole1() == ACTIVE || oldRecord.getRole1() == ACTIVE_TO_STANDBY) + && newRecord.getRole1() == STANDBY) { transitStandby(haGroup, oldRecord.getUrl1(), oldRecord.getRegistryType()); } - if (oldRecord.getRole2() == ACTIVE && newRecord.getRole2() == STANDBY) { + if ((oldRecord.getRole2() == ACTIVE || oldRecord.getRole2() == ACTIVE_TO_STANDBY) + && newRecord.getRole2() == STANDBY) { transitStandby(haGroup, oldRecord.getUrl2(), oldRecord.getRegistryType()); } - if (oldRecord.getRole1() != ACTIVE && newRecord.getRole1() == ACTIVE) { + if ((oldRecord.getRole1() != ACTIVE && oldRecord.getRole1() != ACTIVE_TO_STANDBY) + && (newRecord.getRole1() == ACTIVE || newRecord.getRole1() == ACTIVE_TO_STANDBY)) { transitActive(haGroup, oldRecord.getUrl1(), oldRecord.getRegistryType()); } - if (oldRecord.getRole2() != ACTIVE && newRecord.getRole2() == ACTIVE) { + if ((oldRecord.getRole2() != ACTIVE && oldRecord.getRole2() != ACTIVE_TO_STANDBY) + && (newRecord.getRole2() == ACTIVE || newRecord.getRole2() == ACTIVE_TO_STANDBY)) { transitActive(haGroup, oldRecord.getUrl2(), oldRecord.getRegistryType()); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection2IT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection2IT.java index a93a13c4f6..03411c888c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection2IT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection2IT.java @@ -27,9 +27,12 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -60,6 +63,7 @@ import org.slf4j.LoggerFactory; public class FailoverPhoenixConnection2IT { private static final Logger LOG = LoggerFactory.getLogger(FailoverPhoenixConnectionIT.class); private static final HighAvailabilityTestingUtility.HBaseTestingUtilityPair CLUSTERS = new HighAvailabilityTestingUtility.HBaseTestingUtilityPair(); + private static final long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 1000L; @Rule public final TestName testName = new TestName(); @@ -470,6 +474,112 @@ public class FailoverPhoenixConnection2IT { } } + /** + * Test connections behaviour when doing 2-step failover. + * cluster 1 ( ACTIVE --> ACTIVE_TO_STANDBY --> STANDBY ) + */ + @Test(timeout = 300000) + public void testAllWrappedConnectionsClosedAtRightTimeDuringClusterRoleChange() + throws Exception { + //Write Some data to ACTIVE cluster + Connection connection = createFailoverConnection(); + Statement statement = connection.createStatement(); + short numOfRows = 100; + for (int i = 0; i < numOfRows; i++) { + statement.executeUpdate(String.format("UPSERT INTO %s VALUES(%d, 1984)", tableName, i)); + } + connection.commit(); + + //Creating some connections to ACTIVE cluster + short numberOfConnections = 10; + //Create FailoverPhoenixConnections with default urls + List<Connection> connectionList = new ArrayList<>(numberOfConnections); + for (short i = 0; i < numberOfConnections; i++) { + connectionList.add(createFailoverConnection()); + } + + //Transit Role1 ACTIVE --> ACTIVE_TO_STANDBY + CLUSTERS.transitClusterRole(haGroup, ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, + ClusterRoleRecord.ClusterRole.STANDBY); + //Connections should be open + assertFalse(connection.isClosed()); + for (short i = 0; i < numberOfConnections; i++) { + FailoverPhoenixConnection conn = ((FailoverPhoenixConnection) connectionList.get(i)); + assertFalse(conn.isClosed()); + assertFalse(conn.getWrappedConnection().isClosed()); + } + + //Read with the open connection for random id + try { + ResultSet rs = connection.createStatement().executeQuery( + String.format("SELECT v FROM %s WHERE id = %d", tableName, 50)); + assertTrue(rs.next()); + assertEquals(1984, rs.getInt(1)); + } catch (Exception e) { + fail(); + } + + //Transit Role1 ACTIVE_TO_STANDBY --> STANDBY + CLUSTERS.transitClusterRole(haGroup, ClusterRoleRecord.ClusterRole.STANDBY, + ClusterRoleRecord.ClusterRole.STANDBY); + //Connections should be closed + for (short i = 0; i < numberOfConnections; i++) { + FailoverPhoenixConnection conn = ((FailoverPhoenixConnection) connectionList.get(i)); + assertFalse(conn.isClosed()); + assertTrue(conn.getWrappedConnection().isClosed()); + } + + //Try reading again, but it should throw SQLException + try { + connection.createStatement().executeQuery( + String.format("SELECT v FROM %s WHERE id = %d", tableName, 50)); + fail(); + } catch (Exception e) { + if (e instanceof SQLException) { + //Expected as connections should be closed + } else { + fail(); + } + } + } + + /** + * Test early rollback of Failover Policy where connections should not be affected + * ACTIVE --> ACTIVE_TO_STANDBY and then ACTIVE_TO_STANDBY --> ACTIVE + */ + @Test(timeout = 300000) + public void testEarlyRollbackHasNoEffectOnFailoverConnections() throws Exception { + //Creating some connections to ACTIVE cluster + short numberOfConnections = 3; + //Create FailoverPhoenixConnections with default urls + List<Connection> connectionList = new ArrayList<>(numberOfConnections); + for (short i = 0; i < numberOfConnections; i++) { + connectionList.add(createFailoverConnection()); + } + + //Transit Role1 ACTIVE --> ACTIVE_TO_STANDBY + CLUSTERS.transitClusterRole(haGroup, ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, + ClusterRoleRecord.ClusterRole.STANDBY); + //Connections should be open + for (short i = 0; i < numberOfConnections; i++) { + FailoverPhoenixConnection conn = ((FailoverPhoenixConnection) connectionList.get(i)); + assertFalse(conn.isClosed()); + assertFalse(conn.getWrappedConnection().isClosed()); + } + + //Transit Role1 ACTIVE_TO_STANDBY --> ACTIVE + CLUSTERS.transitClusterRole(haGroup, ClusterRoleRecord.ClusterRole.ACTIVE, + ClusterRoleRecord.ClusterRole.STANDBY); + //Connections should still be open + for (short i = 0; i < numberOfConnections; i++) { + FailoverPhoenixConnection conn = ((FailoverPhoenixConnection) connectionList.get(i)); + assertFalse(conn.isClosed()); + assertFalse(conn.getWrappedConnection().isClosed()); + //closing connections + conn.close(); + } + } + /** * Create a failover connection using {@link #clientProperties}. */