This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new bf90860a67 PHOENIX-7566 Fix to copy over lastUpdatedTimeInMs from
ACTIVE events to STANDBY cluster (#2309) (#2348)
bf90860a67 is described below
commit bf90860a6769d9e69c929d0100715d1cc9ae0b1c
Author: Lokesh Khurana <[email protected]>
AuthorDate: Thu Jan 15 14:35:54 2026 -0800
PHOENIX-7566 Fix to copy over lastUpdatedTimeInMs from ACTIVE events to
STANDBY cluster (#2309) (#2348)
Co-authored-by: ritegarg <[email protected]>
---
.../apache/phoenix/jdbc/HAGroupStateListener.java | 16 ++++-
.../apache/phoenix/jdbc/HAGroupStoreClient.java | 20 +++++-
.../apache/phoenix/jdbc/HAGroupStoreManager.java | 80 +++++++++++++++++++---
.../apache/phoenix/jdbc/HAGroupStoreClientIT.java | 33 +++++++++
.../apache/phoenix/jdbc/HAGroupStoreManagerIT.java | 9 +++
5 files changed, 147 insertions(+), 11 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
index e0e313aeb2..e159293353 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
@@ -37,7 +37,19 @@ import
org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
public interface HAGroupStateListener {
/**
- * Called when an HA group state transition occurs.
+ * <p>
+ * Called when an HA group state transition occurs. ZK Client listens to
changes and sends update
+ * to subscribers using a single thread to guarantee ordering of events.
Subscribers get the state
+ * transition callback through this implementation.
+ * </p>
+ * <p>
+ * For example, if subscriber has subscribed to ACTIVE_NOT_IN_SYNC state on
peer cluster, and the
+ * state transition happens from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, the
subscriber will get the
+ * callback with the following parameters: - haGroupName: the name of the HA
group that
+ * transitioned - fromState: ACTIVE_IN_SYNC - toState: ACTIVE_NOT_IN_SYNC -
modifiedTime: the time
+ * the state transition occurred - clusterType: PEER\ -
lastSyncStateTimeInMs: the time we were in
+ * sync state.
+ * </p>
* <p>
* Implementations should be fast and non-blocking to avoid impacting the HA
group state
* management system. If heavy processing is required, consider delegating
to a separate thread.
@@ -49,7 +61,7 @@ public interface HAGroupStateListener {
* @param toState the new state after the transition
* @param modifiedTime the time the state transition occurred
* @param clusterType whether this transition occurred on the
local or peer cluster
- * @param lastSyncStateTimeInMs the time we were in sync state, can be null.
+ * @param lastSyncStateTimeInMs the time we were in sync state.
* @throws Exception implementations may throw exceptions, but they will be
logged and will not
* prevent other listeners from being notified
*/
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
index 44b9b833f9..957c803f31 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -301,6 +301,22 @@ public class HAGroupStoreClient implements Closeable {
public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState
haGroupState)
throws IOException, InvalidClusterRoleTransitionException, SQLException,
StaleHAGroupStoreRecordVersionException {
+ setHAGroupStatusIfNeeded(haGroupState, null);
+ }
+
+ /**
+ * Set the HA group status for the specified HA group name. Checks if the
status is needed to be
+ * updated based on logic in isUpdateNeeded function.
+ * @param haGroupState the HA group state to set
+ * @param lastSyncTimeInMsNullable the last sync time in milliseconds, can
be null if not known.
+ * @throws IOException if the client is not
healthy or the operation
+ * fails
+ * @throws StaleHAGroupStoreRecordVersionException if the version is stale
+ * @throws InvalidClusterRoleTransitionException when transition is not
valid
+ */
+ public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState
haGroupState,
+ Long lastSyncTimeInMsNullable) throws IOException,
InvalidClusterRoleTransitionException,
+ SQLException, StaleHAGroupStoreRecordVersionException {
Preconditions.checkNotNull(haGroupState, "haGroupState cannot be null");
if (!isHealthy) {
throw new IOException("HAGroupStoreClient is not healthy");
@@ -323,7 +339,9 @@ public class HAGroupStoreClient implements Closeable {
// Once state changes back to ACTIVE_IN_SYNC or the role is
// NOT ACTIVE or ACTIVE_TO_STANDBY
// set the time to null to mark that we are current(or we don't have any
reader).
- Long lastSyncTimeInMs =
currentHAGroupStoreRecord.getLastSyncStateTimeInMs();
+ long lastSyncTimeInMs = lastSyncTimeInMsNullable != null
+ ? lastSyncTimeInMsNullable
+ : currentHAGroupStoreRecord.getLastSyncStateTimeInMs();
ClusterRole clusterRole = haGroupState.getClusterRole();
if (
currentHAGroupStoreRecord.getHAGroupState()
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
index 3017312495..a51f730de1 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
@@ -74,7 +74,12 @@ public class HAGroupStoreManager {
/**
* Functional interface for resolving target local states based on current
local state when peer
- * cluster transitions occur.
+ * cluster transitions occur. This is used in FailoverManagementListener to
determine the target
+ * state based on the current local state.
+ * <p>
+ * For example if peer transitions from AIS -> ANIS, the target state
changes from STANDBY ->
+ * DEGRADED_STANDBY
+ * </p>
*/
@FunctionalInterface
private interface TargetStateResolver {
@@ -487,19 +492,18 @@ public class HAGroupStoreManager {
/**
* Subscribe to be notified when any transition to a target state occurs.
* @param haGroupName the name of the HA group to monitor
- * @param targetState the target state to watch for
+ * @param toState the target state to watch for
* @param clusterType whether to monitor local or peer cluster
* @param listener the listener to notify when any transition to the
target state occurs
* @throws IOException if unable to get HAGroupStoreClient instance
*/
- public void subscribeToTargetState(String haGroupName,
- HAGroupStoreRecord.HAGroupState targetState, ClusterType clusterType,
- HAGroupStateListener listener) throws IOException {
+ public void subscribeToTargetState(String haGroupName,
HAGroupStoreRecord.HAGroupState toState,
+ ClusterType clusterType, HAGroupStateListener listener) throws IOException
{
HAGroupStoreClient client =
getHAGroupStoreClientAndSetupFailoverManagement(haGroupName);
- client.subscribeToTargetState(targetState, clusterType, listener);
+ client.subscribeToTargetState(toState, clusterType, listener);
LOGGER.debug(
"Delegated subscription to target state {} " + "for HA group {} on {}
cluster to client",
- targetState, haGroupName, clusterType);
+ toState, haGroupName, clusterType);
}
/**
@@ -542,6 +546,19 @@ public class HAGroupStoreManager {
* Helper method to get HAGroupStoreClient instance and setup failover
management. NOTE: As soon
* as the HAGroupStoreClient is initialized, it will setup the failover
management as well.
* Failover management is only set up once per HA group to prevent duplicate
subscriptions.
+ * Failover management is responsible for handling the state transitions on
the local and peer
+ * clusters and react accordingly. Failover management handles peer state
transitions and local
+ * state transitions.
+ * <p>
+ * Example of peer state transition: For example, if the peer cluster
transitions from
+ * ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, the failover management will
transition the local cluster
+ * from STANDBY to DEGRADED_STANDBY.
+ * </p>
+ * <p>
+ * Example of local state transition: For example, if the local cluster
transitions to
+ * ABORT_TO_STANDBY, the failover management will transition the local
cluster from
+ * ABORT_TO_STANDBY to STANDBY.
+ * </p>
* @param haGroupName name of the HA group
* @return HAGroupStoreClient instance for the specified HA group
* @throws IOException when HAGroupStoreClient is not initialized
@@ -564,6 +581,22 @@ public class HAGroupStoreManager {
// ===== Failover Management Related Methods =====
+ /**
+ * Setup local failover management for the given HA group. Local failover
management is
+ * responsible for handling the state transitions on the local cluster.
Local failover management
+ * handles local state transitions.
+ * <p>
+ * Example of local state transition: For example, if the local cluster
transitions to
+ * ABORT_TO_STANDBY, the failover management will transition the local
cluster from
+ * ABORT_TO_STANDBY to STANDBY.
+ * </p>
+ * When we subscribe to the target state, we provide a
FailoverManagementListener instance. The
+ * FailoverManagementListener implements the HAGroupStateListener interface
and overrides the
+ * onStateChange method. The onStateChange method is called when a state
change event occurs. It
+ * is passed the haGroupName, fromState, toState, clusterType, and
lastSyncStateTimeInMs
+ * parameters. It is responsible for determining the target state and
transitioning the local
+ * cluster to the target state based on target state resolver.
+ */
public void setupLocalFailoverManagement(String haGroupName) throws
IOException {
HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
@@ -591,6 +624,17 @@ public class HAGroupStoreManager {
this.resolver = resolver;
}
+ /**
+ * <p>
+ * Example of peer state transition: For example, if the peer cluster
transitions from
+ * ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, the failover management will
transition the local
+ * cluster from STANDBY to DEGRADED_STANDBY.
+ * </p>
+ * Example input will look like this: haGroupName: test-ha-group
fromState: ACTIVE_IN_SYNC
+ * toState: ACTIVE_NOT_IN_SYNC clusterType: PEER lastSyncStateTimeInMs:
1719859200000 Based on
+ * this input, the failover management will transition the local cluster
from STANDBY to
+ * DEGRADED_STANDBY. The output state will be determined by the
TargetStateResolver.
+ */
@Override
public void onStateChange(String haGroupName, HAGroupState fromState,
HAGroupState toState,
long modifiedTime, ClusterType clusterType, Long lastSyncStateTimeInMs) {
@@ -616,8 +660,18 @@ public class HAGroupStoreManager {
return;
}
+ // If the target state is STANDBY, and we get an event from
+ // PEER cluster, we copy over the lastSyncTimeInMs from PEER event
notification.
+ Long lastSyncTimeInMsNullable = null;
+ if (
+ targetState.getClusterRole() ==
ClusterRoleRecord.ClusterRole.STANDBY
+ && clusterType == ClusterType.PEER
+ ) {
+ lastSyncTimeInMsNullable = lastSyncStateTimeInMs;
+ }
+
// Execute transition if valid
- client.setHAGroupStatusIfNeeded(targetState);
+ client.setHAGroupStatusIfNeeded(targetState,
lastSyncTimeInMsNullable);
LOGGER.info(
"Failover management transition: peer {} -> {}, " + "local {} ->
{} for HA group: {}",
@@ -636,6 +690,16 @@ public class HAGroupStoreManager {
}
}
+ /**
+ * Setup peer failover management for the given HA group. Peer failover
management is responsible
+ * for handling the state transitions on the peer cluster. Peer failover
management handles peer
+ * state transitions.
+ * <p>
+ * Example of peer state transition: For example, if the peer cluster
transitions from
+ * ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, the failover management will
transition the local cluster
+ * from STANDBY to DEGRADED_STANDBY.
+ * </p>
+ */
public void setupPeerFailoverManagement(String haGroupName) throws
IOException {
HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
index e941e8213d..3f59115503 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
@@ -625,6 +625,39 @@ public class HAGroupStoreClientIT extends BaseTest {
currentRecord.getHAGroupState());
}
+ @Test
+ public void
testSetHAGroupStatusIfNeededWithExplicitLastSyncTimeUpdateExistingRecord()
+ throws Exception {
+ String haGroupName = testName.getMethodName();
+
+ // Create initial record
+ HAGroupStoreRecord initialRecord =
+ new HAGroupStoreRecord("v1.0", haGroupName,
HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC,
+ 0L, HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl,
this.masterUrl,
+ this.peerMasterUrl, 0L);
+ createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName,
initialRecord);
+
+ HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient
+ .getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(),
haGroupName, zkUrl);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify initial state
+ HAGroupStoreRecord currentRecord =
haGroupStoreClient.getHAGroupStoreRecord();
+ assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC,
currentRecord.getHAGroupState());
+
+ // Update to STANDBY (this should succeed as it's a valid transition)
+ long timestamp = System.currentTimeMillis();
+ haGroupStoreClient.setHAGroupStatusIfNeeded(
+ HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, timestamp);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify the record was updated
+ currentRecord = haGroupStoreClient.getHAGroupStoreRecord();
+ assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY,
+ currentRecord.getHAGroupState());
+ assertEquals(timestamp, (long) currentRecord.getLastSyncStateTimeInMs());
+ }
+
@Test
public void testSetHAGroupStatusIfNeededNoUpdateWhenNotNeeded() throws
Exception {
String haGroupName = testName.getMethodName();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
index f15243e6e3..801d779588 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
@@ -26,6 +26,7 @@ import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
import static
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -848,6 +849,8 @@ public class HAGroupStoreManagerIT extends BaseTest {
assertTrue("Cluster2 record should be present",
cluster2Record.isPresent());
assertEquals("Cluster2 should be in STANDBY state",
HAGroupStoreRecord.HAGroupState.STANDBY,
cluster2Record.get().getHAGroupState());
+ assertEquals(0L, (long) cluster1Record.get().getLastSyncStateTimeInMs());
+ assertEquals(0L, (long) cluster2Record.get().getLastSyncStateTimeInMs());
// === STEP 1: Transition to store-and-forward mode ===
// Move cluster1 from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC
(store-and-forward mode)
@@ -869,6 +872,9 @@ public class HAGroupStoreManagerIT extends BaseTest {
assertTrue("Cluster2 record should be present",
cluster2Record.isPresent());
assertEquals("Cluster2 should automatically transition to
DEGRADED_STANDBY",
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY,
cluster2Record.get().getHAGroupState());
+ assertNotEquals(0L, (long)
cluster1Record.get().getLastSyncStateTimeInMs());
+ assertEquals(cluster2Record.get().getLastSyncStateTimeInMs(),
+ cluster1Record.get().getLastSyncStateTimeInMs());
// === STEP 3: Return to sync mode ===
// Move cluster1 back from ACTIVE_NOT_IN_SYNC to ACTIVE_IN_SYNC
@@ -892,5 +898,8 @@ public class HAGroupStoreManagerIT extends BaseTest {
assertTrue("Cluster2 record should be present",
cluster2Record.isPresent());
assertEquals("Cluster2 should automatically transition back to STANDBY",
HAGroupStoreRecord.HAGroupState.STANDBY,
cluster2Record.get().getHAGroupState());
+ assertNotEquals(0L, (long)
cluster1Record.get().getLastSyncStateTimeInMs());
+ assertEquals(cluster2Record.get().getLastSyncStateTimeInMs(),
+ cluster1Record.get().getLastSyncStateTimeInMs());
}
}