This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this 
push:
     new 46889cffb8 PHOENIX-7566 Fix to copy over lastUpdatedTimeInMs from 
ACTIVE events to STANDBY cluster (#2309)
46889cffb8 is described below

commit 46889cffb85661eee59c71a2942eaec39d93ec78
Author: ritegarg <[email protected]>
AuthorDate: Mon Nov 3 13:40:29 2025 -0800

    PHOENIX-7566 Fix to copy over lastUpdatedTimeInMs from ACTIVE events to 
STANDBY cluster (#2309)
    
    Co-authored-by: Ritesh Garg 
<[email protected]>
---
 .../apache/phoenix/jdbc/HAGroupStateListener.java  | 19 ++++-
 .../apache/phoenix/jdbc/HAGroupStoreClient.java    | 29 ++++++-
 .../apache/phoenix/jdbc/HAGroupStoreManager.java   | 94 ++++++++++++++++++++--
 .../apache/phoenix/jdbc/HAGroupStoreClientIT.java  | 30 +++++++
 .../apache/phoenix/jdbc/HAGroupStoreManagerIT.java | 16 +++-
 5 files changed, 173 insertions(+), 15 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
index 1446514f8c..ad2d1620d6 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
@@ -36,8 +36,23 @@ import 
org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
 public interface HAGroupStateListener {
 
     /**
+     * <p>
      * Called when an HA group state transition occurs.
-     *
+     * ZK Client listens to changes and sends update to subscribers using a 
single thread to
+     * guarantee ordering of events. Subscribers get the state transition 
callback
+     * through this implementation.
+     * </p>
+     * <p>
+     * For example, if subscriber has subscribed to ACTIVE_NOT_IN_SYNC state 
on peer cluster,
+     * and the state transition happens from ACTIVE_IN_SYNC to 
ACTIVE_NOT_IN_SYNC,
+     * the subscriber will get the callback with the following parameters:
+     * - haGroupName: the name of the HA group that transitioned
+     * - fromState: ACTIVE_IN_SYNC
+     * - toState: ACTIVE_NOT_IN_SYNC
+     * - modifiedTime: the time the state transition occurred
+     * - clusterType: PEER
+     * - lastSyncStateTimeInMs: the time we were in sync state.
+     * </p>
      * <p>Implementations should be fast and non-blocking to avoid impacting
      * the HA group state management system. If heavy processing is required,
      * consider delegating to a separate thread.</p>
@@ -50,7 +65,7 @@ public interface HAGroupStateListener {
      * @param toState the new state after the transition
      * @param modifiedTime the time the state transition occurred
      * @param clusterType whether this transition occurred on the local or 
peer cluster
-     * @param lastSyncStateTimeInMs the time we were in sync state, can be 
null.
+     * @param lastSyncStateTimeInMs the time we were in sync state.
      *
      * @throws Exception implementations may throw exceptions, but they will be
      *                   logged and will not prevent other listeners from 
being notified
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
index f05b58e4a6..c24d63a96f 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -310,6 +310,25 @@ public class HAGroupStoreClient implements Closeable {
             throws IOException,
             InvalidClusterRoleTransitionException,
             SQLException, StaleHAGroupStoreRecordVersionException {
+        setHAGroupStatusIfNeeded(haGroupState, null);
+    }
+
+    /**
+     * Set the HA group status for the specified HA group name.
+     * Checks if the status is needed to be updated based on logic in 
isUpdateNeeded function.
+     *
+     * @param haGroupState the HA group state to set
+     * @param lastSyncTimeInMsNullable the last sync time in milliseconds, can 
be null if not known.
+     * @throws IOException if the client is not healthy or the operation fails
+     * @throws StaleHAGroupStoreRecordVersionException if the version is stale
+     * @throws InvalidClusterRoleTransitionException when transition is not 
valid
+     * @throws SQLException
+     */
+    public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState 
haGroupState,
+                                         Long lastSyncTimeInMsNullable)
+            throws IOException,
+            InvalidClusterRoleTransitionException,
+            SQLException, StaleHAGroupStoreRecordVersionException {
         Preconditions.checkNotNull(haGroupState, "haGroupState cannot be 
null");
         if (!isHealthy) {
             throw new IOException("HAGroupStoreClient is not healthy");
@@ -330,13 +349,15 @@ public class HAGroupStoreClient implements Closeable {
                 // Once state changes back to ACTIVE_IN_SYNC or the role is
                 // NOT ACTIVE or ACTIVE_TO_STANDBY
                 // set the time to null to mark that we are current(or we 
don't have any reader).
-                Long lastSyncTimeInMs = currentHAGroupStoreRecord
-                        .getLastSyncStateTimeInMs();
+            long lastSyncTimeInMs
+                    = lastSyncTimeInMsNullable != null
+                    ? lastSyncTimeInMsNullable
+                    : currentHAGroupStoreRecord.getLastSyncStateTimeInMs();
                 ClusterRole clusterRole = haGroupState.getClusterRole();
                 if (currentHAGroupStoreRecord.getHAGroupState()
                         == HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC
                         && haGroupState == 
HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC) {
-                    // We record the last round timestamp by subtracting the 
rotationTime and then 
+                    // We record the last round timestamp by subtracting the 
rotationTime and then
                     // taking the beginning of last round (floor) by first 
integer division and then multiplying again.
                     lastSyncTimeInMs = ((System.currentTimeMillis() - 
rotationTimeMs)/rotationTimeMs) * (rotationTimeMs);
                 }
@@ -1176,4 +1197,4 @@ public class HAGroupStoreClient implements Closeable {
         return clusterType + ":" + targetState;
     }
 
-}
\ No newline at end of file
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
index 2d84e7a284..b358ab5b88 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
@@ -83,6 +83,10 @@ public class HAGroupStoreManager {
     /**
      * Functional interface for resolving target local states based on current 
local state
      * when peer cluster transitions occur.
+     * This is used in FailoverManagementListener to determine the target 
state based on the current local state.
+     * <p>
+     * For example if peer transitions from AIS -> ANIS,
+     * the target state changes from STANDBY -> DEGRADED_STANDBY
      */
     @FunctionalInterface
     private interface TargetStateResolver {
@@ -528,21 +532,21 @@ public class HAGroupStoreManager {
      * Subscribe to be notified when any transition to a target state occurs.
      *
      * @param haGroupName the name of the HA group to monitor
-     * @param targetState the target state to watch for
+     * @param toState the target state to watch for
      * @param clusterType whether to monitor local or peer cluster
      * @param listener the listener to notify when any transition to the 
target state occurs
      * @throws IOException if unable to get HAGroupStoreClient instance
      */
     public void subscribeToTargetState(String haGroupName,
-                                       HAGroupStoreRecord.HAGroupState 
targetState,
+                                       HAGroupStoreRecord.HAGroupState toState,
                                        ClusterType clusterType,
                                        HAGroupStateListener listener) throws 
IOException {
         HAGroupStoreClient client
                 = getHAGroupStoreClientAndSetupFailoverManagement(haGroupName);
-        client.subscribeToTargetState(targetState, clusterType, listener);
+        client.subscribeToTargetState(toState, clusterType, listener);
         LOGGER.debug("Delegated subscription to target state {} "
                         + "for HA group {} on {} cluster to client",
-                targetState, haGroupName, clusterType);
+                toState, haGroupName, clusterType);
     }
 
     /**
@@ -595,6 +599,21 @@ public class HAGroupStoreManager {
      * Failover management is only set up once per HA group
      * to prevent duplicate subscriptions.
      *
+     * Failover management is responsible for handling the state transitions 
on the local and peer clusters and react accordingly.
+     * Failover management handles peer state transitions and local state 
transitions.
+     *
+     * <p>
+     * Example of peer state transition:
+     * For example, if the peer cluster transitions from ACTIVE_IN_SYNC to 
ACTIVE_NOT_IN_SYNC,
+     * the failover management will transition the local cluster from STANDBY 
to DEGRADED_STANDBY.
+     * </p>
+     *
+     * <p>
+     * Example of local state transition:
+     * For example, if the local cluster transitions to ABORT_TO_STANDBY,
+     * the failover management will transition the local cluster from 
ABORT_TO_STANDBY to STANDBY.
+     * </p>
+     *
      * @param haGroupName name of the HA group
      * @return HAGroupStoreClient instance for the specified HA group
      * @throws IOException when HAGroupStoreClient is not initialized
@@ -619,7 +638,27 @@ public class HAGroupStoreManager {
 
     // ===== Failover Management Related Methods =====
 
-    public void setupLocalFailoverManagement(String haGroupName) throws 
IOException {
+    /**
+     *
+     * Setup local failover management for the given HA group.
+     * Local failover management is responsible for handling the state 
transitions on the local cluster.
+     * Local failover management handles local state transitions.
+     *
+     * <p>
+     * Example of local state transition:
+     * For example, if the local cluster transitions to ABORT_TO_STANDBY,
+     * the failover management will transition the local cluster from 
ABORT_TO_STANDBY to STANDBY.
+     * </p>
+     *
+     * When we subscribe to the target state, we provide a 
FailoverManagementListener instance.
+     * The FailoverManagementListener implements the HAGroupStateListener 
interface and overrides the onStateChange method.
+     * The onStateChange method is called when a state change event occurs.
+     * It is passed the haGroupName, fromState, toState, clusterType, and 
lastSyncStateTimeInMs parameters.
+     * It is responsible for determining the target state and transitioning 
the local cluster to the target state based on target state resolver.
+     * @param haGroupName
+     * @throws IOException
+     */
+    private void setupLocalFailoverManagement(String haGroupName) throws 
IOException {
         HAGroupStoreClient haGroupStoreClient = 
getHAGroupStoreClient(haGroupName);
 
         // Generic subscription loop using static local transition mapping
@@ -637,6 +676,8 @@ public class HAGroupStoreManager {
     /**
      * Listener implementation for handling peer failover management state 
transitions.
      * Subscribes to peer state changes and triggers appropriate local state 
transitions.
+     *
+     *
      */
     private static class FailoverManagementListener implements 
HAGroupStateListener {
         private final HAGroupStoreClient client;
@@ -648,6 +689,24 @@ public class HAGroupStoreManager {
             this.resolver = resolver;
         }
 
+        /*
+
+        <p>
+        Example of peer state transition:
+        For example, if the peer cluster transitions from ACTIVE_IN_SYNC to 
ACTIVE_NOT_IN_SYNC,
+        the failover management will transition the local cluster from STANDBY 
to DEGRADED_STANDBY.
+        </p>
+        Example input will look like this:
+        haGroupName: test-ha-group
+        fromState: ACTIVE_IN_SYNC
+        toState: ACTIVE_NOT_IN_SYNC
+        clusterType: PEER
+        lastSyncStateTimeInMs: 1719859200000
+
+        Based on this input, the failover management will transition the local 
cluster from STANDBY to DEGRADED_STANDBY.
+        The output state will be determined by the TargetStateResolver.
+         */
+
         @Override
         public void onStateChange(String haGroupName,
                                   HAGroupState fromState,
@@ -677,8 +736,16 @@ public class HAGroupStoreManager {
                         return;
                     }
 
+                    // If the target state is STANDBY and we get an event from
+                    // PEER cluster, we copy over the lastSyncTimeInMs from 
PEER event notification.
+                    Long lastSyncTimeInMsNullable = null;
+                    if (targetState.getClusterRole() == ClusterRole.STANDBY
+                            && clusterType == ClusterType.PEER) {
+                        lastSyncTimeInMsNullable = lastSyncStateTimeInMs;
+                    }
+
                     // Execute transition if valid
-                    client.setHAGroupStatusIfNeeded(targetState);
+                    client.setHAGroupStatusIfNeeded(targetState, 
lastSyncTimeInMsNullable);
 
                     LOGGER.info("Failover management transition: peer {} -> 
{}, "
                                     + "local {} -> {} for HA group: {}",
@@ -696,7 +763,20 @@ public class HAGroupStoreManager {
         }
     }
 
-    public void setupPeerFailoverManagement(String haGroupName) throws 
IOException {
+    /**
+     * Setup peer failover management for the given HA group.
+     * Peer failover management is responsible for handling the state 
transitions on the peer cluster.
+     * Peer failover management handles peer state transitions.
+     *
+     * <p>
+     * Example of peer state transition:
+     * For example, if the peer cluster transitions from ACTIVE_IN_SYNC to 
ACTIVE_NOT_IN_SYNC,
+     * the failover management will transition the local cluster from STANDBY 
to DEGRADED_STANDBY.
+     * </p>
+     * @param haGroupName
+     * @throws IOException
+     */
+    private void setupPeerFailoverManagement(String haGroupName) throws 
IOException {
         HAGroupStoreClient haGroupStoreClient = 
getHAGroupStoreClient(haGroupName);
 
         // Generic subscription loop using static transition mapping
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
index aeee4f3851..cf3ed2c045 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
@@ -558,6 +558,36 @@ public class HAGroupStoreClientIT extends BaseTest {
         
assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, 
currentRecord.getHAGroupState());
     }
 
+    @Test
+    public void 
testSetHAGroupStatusIfNeededWithExplicitLastSyncTimeUpdateExistingRecord() 
throws Exception {
+        String haGroupName = testName.getMethodName();
+
+        // Create initial record
+        HAGroupStoreRecord initialRecord = new HAGroupStoreRecord("v1.0", 
haGroupName,
+                HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, 0L, 
HighAvailabilityPolicy.FAILOVER.toString(),
+                this.peerZKUrl, this.masterUrl, this.peerMasterUrl, 0L);
+        createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName, 
initialRecord);
+
+        HAGroupStoreClient haGroupStoreClient = 
HAGroupStoreClient.getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(),
 haGroupName, zkUrl);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Verify initial state
+        HAGroupStoreRecord currentRecord = 
haGroupStoreClient.getHAGroupStoreRecord();
+        assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, 
currentRecord.getHAGroupState());
+
+        // Update to STANDBY (this should succeed as it's a valid transition)
+        long timestamp = System.currentTimeMillis();
+        
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY,
 timestamp);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Verify the record was updated
+        currentRecord = haGroupStoreClient.getHAGroupStoreRecord();
+        
assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, 
currentRecord.getHAGroupState());
+        assertEquals(timestamp, (long) 
currentRecord.getLastSyncStateTimeInMs());
+    }
+    
+    
+
     @Test
     public void testSetHAGroupStatusIfNeededNoUpdateWhenNotNeeded() throws 
Exception {
         String haGroupName = testName.getMethodName();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
index 5e1306f079..f5729b49f7 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
@@ -50,6 +50,7 @@ import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
 import static 
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -817,6 +818,9 @@ public class HAGroupStoreManagerIT extends BaseTest {
         assertTrue("Cluster2 record should be present", 
cluster2Record.isPresent());
         assertEquals("Cluster2 should be in STANDBY state",
                 HAGroupStoreRecord.HAGroupState.STANDBY, 
cluster2Record.get().getHAGroupState());
+        assertEquals(0L, (long) 
cluster1Record.get().getLastSyncStateTimeInMs());
+        assertEquals(0L, (long) 
cluster2Record.get().getLastSyncStateTimeInMs());
+
 
         // === STEP 1: Transition to store-and-forward mode ===
         // Move cluster1 from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC 
(store-and-forward mode)
@@ -827,7 +831,8 @@ public class HAGroupStoreManagerIT extends BaseTest {
         cluster1Record = cluster1HAManager.getHAGroupStoreRecord(haGroupName);
         assertTrue("Cluster1 record should be present", 
cluster1Record.isPresent());
         assertEquals("Cluster1 should be in ACTIVE_NOT_IN_SYNC state",
-                HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC, 
cluster1Record.get().getHAGroupState());
+                HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
+                cluster1Record.get().getHAGroupState());
 
         // === STEP 2: Verify automatic peer reaction to store-and-forward ===
         // Cluster2 (standby) should automatically move from STANDBY to 
DEGRADED_STANDBY
@@ -838,6 +843,9 @@ public class HAGroupStoreManagerIT extends BaseTest {
         assertTrue("Cluster2 record should be present", 
cluster2Record.isPresent());
         assertEquals("Cluster2 should automatically transition to 
DEGRADED_STANDBY",
                 HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, 
cluster2Record.get().getHAGroupState());
+        assertNotEquals(0L, (long) 
cluster1Record.get().getLastSyncStateTimeInMs());
+        assertEquals(cluster2Record.get().getLastSyncStateTimeInMs(),
+                cluster1Record.get().getLastSyncStateTimeInMs());
 
         // === STEP 3: Return to sync mode ===
         // Move cluster1 back from ACTIVE_NOT_IN_SYNC to ACTIVE_IN_SYNC
@@ -850,7 +858,8 @@ public class HAGroupStoreManagerIT extends BaseTest {
         cluster1Record = cluster1HAManager.getHAGroupStoreRecord(haGroupName);
         assertTrue("Cluster1 record should be present", 
cluster1Record.isPresent());
         assertEquals("Cluster1 should be back in ACTIVE_IN_SYNC state",
-                HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, 
cluster1Record.get().getHAGroupState());
+                HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC,
+                cluster1Record.get().getHAGroupState());
 
         // === STEP 4: Verify automatic peer recovery ===
         // Cluster2 should automatically move from DEGRADED_STANDBY back to 
STANDBY
@@ -861,5 +870,8 @@ public class HAGroupStoreManagerIT extends BaseTest {
         assertTrue("Cluster2 record should be present", 
cluster2Record.isPresent());
         assertEquals("Cluster2 should automatically transition back to 
STANDBY",
                 HAGroupStoreRecord.HAGroupState.STANDBY, 
cluster2Record.get().getHAGroupState());
+        assertNotEquals(0L, (long) 
cluster1Record.get().getLastSyncStateTimeInMs());
+        assertEquals(cluster2Record.get().getLastSyncStateTimeInMs(),
+                cluster1Record.get().getLastSyncStateTimeInMs());
     }
 }
\ No newline at end of file

Reply via email to