This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new bf90860a67 PHOENIX-7566 Fix to copy over lastUpdatedTimeInMs from 
ACTIVE events to STANDBY cluster (#2309) (#2348)
bf90860a67 is described below

commit bf90860a6769d9e69c929d0100715d1cc9ae0b1c
Author: Lokesh Khurana <[email protected]>
AuthorDate: Thu Jan 15 14:35:54 2026 -0800

    PHOENIX-7566 Fix to copy over lastUpdatedTimeInMs from ACTIVE events to 
STANDBY cluster (#2309) (#2348)
    
    Co-authored-by: ritegarg <[email protected]>
---
 .../apache/phoenix/jdbc/HAGroupStateListener.java  | 16 ++++-
 .../apache/phoenix/jdbc/HAGroupStoreClient.java    | 20 +++++-
 .../apache/phoenix/jdbc/HAGroupStoreManager.java   | 80 +++++++++++++++++++---
 .../apache/phoenix/jdbc/HAGroupStoreClientIT.java  | 33 +++++++++
 .../apache/phoenix/jdbc/HAGroupStoreManagerIT.java |  9 +++
 5 files changed, 147 insertions(+), 11 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
index e0e313aeb2..e159293353 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
@@ -37,7 +37,19 @@ import 
org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
 public interface HAGroupStateListener {
 
   /**
-   * Called when an HA group state transition occurs.
+   * <p>
+   * Called when an HA group state transition occurs. ZK Client listens to 
changes and sends update
+   * to subscribers using a single thread to guarantee ordering of events. 
Subscribers get the state
+   * transition callback through this implementation.
+   * </p>
+   * <p>
+   * For example, if subscriber has subscribed to ACTIVE_NOT_IN_SYNC state on 
peer cluster, and the
+   * state transition happens from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, the 
subscriber will get the
+   * callback with the following parameters: - haGroupName: the name of the HA 
group that
+   * transitioned - fromState: ACTIVE_IN_SYNC - toState: ACTIVE_NOT_IN_SYNC - 
modifiedTime: the time
+   * the state transition occurred - clusterType: PEER\ - 
lastSyncStateTimeInMs: the time we were in
+   * sync state.
+   * </p>
    * <p>
    * Implementations should be fast and non-blocking to avoid impacting the HA 
group state
    * management system. If heavy processing is required, consider delegating 
to a separate thread.
@@ -49,7 +61,7 @@ public interface HAGroupStateListener {
    * @param toState               the new state after the transition
    * @param modifiedTime          the time the state transition occurred
    * @param clusterType           whether this transition occurred on the 
local or peer cluster
-   * @param lastSyncStateTimeInMs the time we were in sync state, can be null.
+   * @param lastSyncStateTimeInMs the time we were in sync state.
    * @throws Exception implementations may throw exceptions, but they will be 
logged and will not
    *                   prevent other listeners from being notified
    */
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
index 44b9b833f9..957c803f31 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -301,6 +301,22 @@ public class HAGroupStoreClient implements Closeable {
   public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState 
haGroupState)
     throws IOException, InvalidClusterRoleTransitionException, SQLException,
     StaleHAGroupStoreRecordVersionException {
+    setHAGroupStatusIfNeeded(haGroupState, null);
+  }
+
+  /**
+   * Set the HA group status for the specified HA group name. Checks if the 
status is needed to be
+   * updated based on logic in isUpdateNeeded function.
+   * @param haGroupState             the HA group state to set
+   * @param lastSyncTimeInMsNullable the last sync time in milliseconds, can 
be null if not known.
+   * @throws IOException                             if the client is not 
healthy or the operation
+   *                                                 fails
+   * @throws StaleHAGroupStoreRecordVersionException if the version is stale
+   * @throws InvalidClusterRoleTransitionException   when transition is not 
valid
+   */
+  public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState 
haGroupState,
+    Long lastSyncTimeInMsNullable) throws IOException, 
InvalidClusterRoleTransitionException,
+    SQLException, StaleHAGroupStoreRecordVersionException {
     Preconditions.checkNotNull(haGroupState, "haGroupState cannot be null");
     if (!isHealthy) {
       throw new IOException("HAGroupStoreClient is not healthy");
@@ -323,7 +339,9 @@ public class HAGroupStoreClient implements Closeable {
       // Once state changes back to ACTIVE_IN_SYNC or the role is
       // NOT ACTIVE or ACTIVE_TO_STANDBY
       // set the time to null to mark that we are current(or we don't have any 
reader).
-      Long lastSyncTimeInMs = 
currentHAGroupStoreRecord.getLastSyncStateTimeInMs();
+      long lastSyncTimeInMs = lastSyncTimeInMsNullable != null
+        ? lastSyncTimeInMsNullable
+        : currentHAGroupStoreRecord.getLastSyncStateTimeInMs();
       ClusterRole clusterRole = haGroupState.getClusterRole();
       if (
         currentHAGroupStoreRecord.getHAGroupState()
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
index 3017312495..a51f730de1 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
@@ -74,7 +74,12 @@ public class HAGroupStoreManager {
 
   /**
    * Functional interface for resolving target local states based on current 
local state when peer
-   * cluster transitions occur.
+   * cluster transitions occur. This is used in FailoverManagementListener to 
determine the target
+   * state based on the current local state.
+   * <p>
+   * For example if peer transitions from AIS -> ANIS, the target state 
changes from STANDBY ->
+   * DEGRADED_STANDBY
+   * </p>
    */
   @FunctionalInterface
   private interface TargetStateResolver {
@@ -487,19 +492,18 @@ public class HAGroupStoreManager {
   /**
    * Subscribe to be notified when any transition to a target state occurs.
    * @param haGroupName the name of the HA group to monitor
-   * @param targetState the target state to watch for
+   * @param toState     the target state to watch for
    * @param clusterType whether to monitor local or peer cluster
    * @param listener    the listener to notify when any transition to the 
target state occurs
    * @throws IOException if unable to get HAGroupStoreClient instance
    */
-  public void subscribeToTargetState(String haGroupName,
-    HAGroupStoreRecord.HAGroupState targetState, ClusterType clusterType,
-    HAGroupStateListener listener) throws IOException {
+  public void subscribeToTargetState(String haGroupName, 
HAGroupStoreRecord.HAGroupState toState,
+    ClusterType clusterType, HAGroupStateListener listener) throws IOException 
{
     HAGroupStoreClient client = 
getHAGroupStoreClientAndSetupFailoverManagement(haGroupName);
-    client.subscribeToTargetState(targetState, clusterType, listener);
+    client.subscribeToTargetState(toState, clusterType, listener);
     LOGGER.debug(
       "Delegated subscription to target state {} " + "for HA group {} on {} 
cluster to client",
-      targetState, haGroupName, clusterType);
+      toState, haGroupName, clusterType);
   }
 
   /**
@@ -542,6 +546,19 @@ public class HAGroupStoreManager {
    * Helper method to get HAGroupStoreClient instance and setup failover 
management. NOTE: As soon
    * as the HAGroupStoreClient is initialized, it will setup the failover 
management as well.
    * Failover management is only set up once per HA group to prevent duplicate 
subscriptions.
+   * Failover management is responsible for handling the state transitions on 
the local and peer
+   * clusters and react accordingly. Failover management handles peer state 
transitions and local
+   * state transitions.
+   * <p>
+   * Example of peer state transition: For example, if the peer cluster 
transitions from
+   * ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, the failover management will 
transition the local cluster
+   * from STANDBY to DEGRADED_STANDBY.
+   * </p>
+   * <p>
+   * Example of local state transition: For example, if the local cluster 
transitions to
+   * ABORT_TO_STANDBY, the failover management will transition the local 
cluster from
+   * ABORT_TO_STANDBY to STANDBY.
+   * </p>
    * @param haGroupName name of the HA group
    * @return HAGroupStoreClient instance for the specified HA group
    * @throws IOException when HAGroupStoreClient is not initialized
@@ -564,6 +581,22 @@ public class HAGroupStoreManager {
 
   // ===== Failover Management Related Methods =====
 
+  /**
+   * Setup local failover management for the given HA group. Local failover 
management is
+   * responsible for handling the state transitions on the local cluster. 
Local failover management
+   * handles local state transitions.
+   * <p>
+   * Example of local state transition: For example, if the local cluster 
transitions to
+   * ABORT_TO_STANDBY, the failover management will transition the local 
cluster from
+   * ABORT_TO_STANDBY to STANDBY.
+   * </p>
+   * When we subscribe to the target state, we provide a 
FailoverManagementListener instance. The
+   * FailoverManagementListener implements the HAGroupStateListener interface 
and overrides the
+   * onStateChange method. The onStateChange method is called when a state 
change event occurs. It
+   * is passed the haGroupName, fromState, toState, clusterType, and 
lastSyncStateTimeInMs
+   * parameters. It is responsible for determining the target state and 
transitioning the local
+   * cluster to the target state based on target state resolver.
+   */
   public void setupLocalFailoverManagement(String haGroupName) throws 
IOException {
     HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
 
@@ -591,6 +624,17 @@ public class HAGroupStoreManager {
       this.resolver = resolver;
     }
 
+    /**
+     * <p>
+     * Example of peer state transition: For example, if the peer cluster 
transitions from
+     * ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, the failover management will 
transition the local
+     * cluster from STANDBY to DEGRADED_STANDBY.
+     * </p>
+     * Example input will look like this: haGroupName: test-ha-group 
fromState: ACTIVE_IN_SYNC
+     * toState: ACTIVE_NOT_IN_SYNC clusterType: PEER lastSyncStateTimeInMs: 
1719859200000 Based on
+     * this input, the failover management will transition the local cluster 
from STANDBY to
+     * DEGRADED_STANDBY. The output state will be determined by the 
TargetStateResolver.
+     */
     @Override
     public void onStateChange(String haGroupName, HAGroupState fromState, 
HAGroupState toState,
       long modifiedTime, ClusterType clusterType, Long lastSyncStateTimeInMs) {
@@ -616,8 +660,18 @@ public class HAGroupStoreManager {
             return;
           }
 
+          // If the target state is STANDBY, and we get an event from
+          // PEER cluster, we copy over the lastSyncTimeInMs from PEER event 
notification.
+          Long lastSyncTimeInMsNullable = null;
+          if (
+            targetState.getClusterRole() == 
ClusterRoleRecord.ClusterRole.STANDBY
+              && clusterType == ClusterType.PEER
+          ) {
+            lastSyncTimeInMsNullable = lastSyncStateTimeInMs;
+          }
+
           // Execute transition if valid
-          client.setHAGroupStatusIfNeeded(targetState);
+          client.setHAGroupStatusIfNeeded(targetState, 
lastSyncTimeInMsNullable);
 
           LOGGER.info(
             "Failover management transition: peer {} -> {}, " + "local {} -> 
{} for HA group: {}",
@@ -636,6 +690,16 @@ public class HAGroupStoreManager {
     }
   }
 
+  /**
+   * Setup peer failover management for the given HA group. Peer failover 
management is responsible
+   * for handling the state transitions on the peer cluster. Peer failover 
management handles peer
+   * state transitions.
+   * <p>
+   * Example of peer state transition: For example, if the peer cluster 
transitions from
+   * ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, the failover management will 
transition the local cluster
+   * from STANDBY to DEGRADED_STANDBY.
+   * </p>
+   */
   public void setupPeerFailoverManagement(String haGroupName) throws 
IOException {
     HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
index e941e8213d..3f59115503 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
@@ -625,6 +625,39 @@ public class HAGroupStoreClientIT extends BaseTest {
       currentRecord.getHAGroupState());
   }
 
+  @Test
+  public void 
testSetHAGroupStatusIfNeededWithExplicitLastSyncTimeUpdateExistingRecord()
+    throws Exception {
+    String haGroupName = testName.getMethodName();
+
+    // Create initial record
+    HAGroupStoreRecord initialRecord =
+      new HAGroupStoreRecord("v1.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC,
+        0L, HighAvailabilityPolicy.FAILOVER.toString(), this.peerZKUrl, 
this.masterUrl,
+        this.peerMasterUrl, 0L);
+    createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, 
initialRecord);
+
+    HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient
+      .getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(), 
haGroupName, zkUrl);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Verify initial state
+    HAGroupStoreRecord currentRecord = 
haGroupStoreClient.getHAGroupStoreRecord();
+    assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, 
currentRecord.getHAGroupState());
+
+    // Update to STANDBY (this should succeed as it's a valid transition)
+    long timestamp = System.currentTimeMillis();
+    haGroupStoreClient.setHAGroupStatusIfNeeded(
+      HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, timestamp);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Verify the record was updated
+    currentRecord = haGroupStoreClient.getHAGroupStoreRecord();
+    assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY,
+      currentRecord.getHAGroupState());
+    assertEquals(timestamp, (long) currentRecord.getLastSyncStateTimeInMs());
+  }
+
   @Test
   public void testSetHAGroupStatusIfNeededNoUpdateWhenNotNeeded() throws 
Exception {
     String haGroupName = testName.getMethodName();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
index f15243e6e3..801d779588 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
@@ -26,6 +26,7 @@ import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
 import static 
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -848,6 +849,8 @@ public class HAGroupStoreManagerIT extends BaseTest {
     assertTrue("Cluster2 record should be present", 
cluster2Record.isPresent());
     assertEquals("Cluster2 should be in STANDBY state", 
HAGroupStoreRecord.HAGroupState.STANDBY,
       cluster2Record.get().getHAGroupState());
+    assertEquals(0L, (long) cluster1Record.get().getLastSyncStateTimeInMs());
+    assertEquals(0L, (long) cluster2Record.get().getLastSyncStateTimeInMs());
 
     // === STEP 1: Transition to store-and-forward mode ===
     // Move cluster1 from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC 
(store-and-forward mode)
@@ -869,6 +872,9 @@ public class HAGroupStoreManagerIT extends BaseTest {
     assertTrue("Cluster2 record should be present", 
cluster2Record.isPresent());
     assertEquals("Cluster2 should automatically transition to 
DEGRADED_STANDBY",
       HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, 
cluster2Record.get().getHAGroupState());
+    assertNotEquals(0L, (long) 
cluster1Record.get().getLastSyncStateTimeInMs());
+    assertEquals(cluster2Record.get().getLastSyncStateTimeInMs(),
+      cluster1Record.get().getLastSyncStateTimeInMs());
 
     // === STEP 3: Return to sync mode ===
     // Move cluster1 back from ACTIVE_NOT_IN_SYNC to ACTIVE_IN_SYNC
@@ -892,5 +898,8 @@ public class HAGroupStoreManagerIT extends BaseTest {
     assertTrue("Cluster2 record should be present", 
cluster2Record.isPresent());
     assertEquals("Cluster2 should automatically transition back to STANDBY",
       HAGroupStoreRecord.HAGroupState.STANDBY, 
cluster2Record.get().getHAGroupState());
+    assertNotEquals(0L, (long) 
cluster1Record.get().getLastSyncStateTimeInMs());
+    assertEquals(cluster2Record.get().getLastSyncStateTimeInMs(),
+      cluster1Record.get().getLastSyncStateTimeInMs());
   }
 }

Reply via email to