This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this
push:
new 46889cffb8 PHOENIX-7566 Fix to copy over lastUpdatedTimeInMs from
ACTIVE events to STANDBY cluster (#2309)
46889cffb8 is described below
commit 46889cffb85661eee59c71a2942eaec39d93ec78
Author: ritegarg <[email protected]>
AuthorDate: Mon Nov 3 13:40:29 2025 -0800
PHOENIX-7566 Fix to copy over lastUpdatedTimeInMs from ACTIVE events to
STANDBY cluster (#2309)
Co-authored-by: Ritesh Garg
<[email protected]>
---
.../apache/phoenix/jdbc/HAGroupStateListener.java | 19 ++++-
.../apache/phoenix/jdbc/HAGroupStoreClient.java | 29 ++++++-
.../apache/phoenix/jdbc/HAGroupStoreManager.java | 94 ++++++++++++++++++++--
.../apache/phoenix/jdbc/HAGroupStoreClientIT.java | 30 +++++++
.../apache/phoenix/jdbc/HAGroupStoreManagerIT.java | 16 +++-
5 files changed, 173 insertions(+), 15 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
index 1446514f8c..ad2d1620d6 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
@@ -36,8 +36,23 @@ import
org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
public interface HAGroupStateListener {
/**
+ * <p>
* Called when an HA group state transition occurs.
- *
+ * ZK Client listens to changes and sends update to subscribers using a
single thread to
+ * guarantee ordering of events. Subscribers get the state transition
callback
+ * through this implementation.
+ * </p>
+ * <p>
+ * For example, if subscriber has subscribed to ACTIVE_NOT_IN_SYNC state
on peer cluster,
+ * and the state transition happens from ACTIVE_IN_SYNC to
ACTIVE_NOT_IN_SYNC,
+ * the subscriber will get the callback with the following parameters:
+ * - haGroupName: the name of the HA group that transitioned
+ * - fromState: ACTIVE_IN_SYNC
+ * - toState: ACTIVE_NOT_IN_SYNC
+ * - modifiedTime: the time the state transition occurred
+ * - clusterType: PEER
+ * - lastSyncStateTimeInMs: the time we were in sync state.
+ * </p>
* <p>Implementations should be fast and non-blocking to avoid impacting
* the HA group state management system. If heavy processing is required,
* consider delegating to a separate thread.</p>
@@ -50,7 +65,7 @@ public interface HAGroupStateListener {
* @param toState the new state after the transition
* @param modifiedTime the time the state transition occurred
* @param clusterType whether this transition occurred on the local or
peer cluster
- * @param lastSyncStateTimeInMs the time we were in sync state, can be
null.
+ * @param lastSyncStateTimeInMs the time we were in sync state.
*
* @throws Exception implementations may throw exceptions, but they will be
* logged and will not prevent other listeners from
being notified
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
index f05b58e4a6..c24d63a96f 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -310,6 +310,25 @@ public class HAGroupStoreClient implements Closeable {
throws IOException,
InvalidClusterRoleTransitionException,
SQLException, StaleHAGroupStoreRecordVersionException {
+ setHAGroupStatusIfNeeded(haGroupState, null);
+ }
+
+ /**
+ * Set the HA group status for the specified HA group name.
+ * Checks if the status is needed to be updated based on logic in
isUpdateNeeded function.
+ *
+ * @param haGroupState the HA group state to set
+ * @param lastSyncTimeInMsNullable the last sync time in milliseconds, can
be null if not known.
+ * @throws IOException if the client is not healthy or the operation fails
+ * @throws StaleHAGroupStoreRecordVersionException if the version is stale
+ * @throws InvalidClusterRoleTransitionException when transition is not
valid
+ * @throws SQLException
+ */
+ public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState
haGroupState,
+ Long lastSyncTimeInMsNullable)
+ throws IOException,
+ InvalidClusterRoleTransitionException,
+ SQLException, StaleHAGroupStoreRecordVersionException {
Preconditions.checkNotNull(haGroupState, "haGroupState cannot be
null");
if (!isHealthy) {
throw new IOException("HAGroupStoreClient is not healthy");
@@ -330,13 +349,15 @@ public class HAGroupStoreClient implements Closeable {
// Once state changes back to ACTIVE_IN_SYNC or the role is
// NOT ACTIVE or ACTIVE_TO_STANDBY
// set the time to null to mark that we are current(or we
don't have any reader).
- Long lastSyncTimeInMs = currentHAGroupStoreRecord
- .getLastSyncStateTimeInMs();
+ long lastSyncTimeInMs
+ = lastSyncTimeInMsNullable != null
+ ? lastSyncTimeInMsNullable
+ : currentHAGroupStoreRecord.getLastSyncStateTimeInMs();
ClusterRole clusterRole = haGroupState.getClusterRole();
if (currentHAGroupStoreRecord.getHAGroupState()
== HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC
&& haGroupState ==
HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC) {
- // We record the last round timestamp by subtracting the
rotationTime and then
+ // We record the last round timestamp by subtracting the
rotationTime and then
// taking the beginning of last round (floor) by first
integer division and then multiplying again.
lastSyncTimeInMs = ((System.currentTimeMillis() -
rotationTimeMs)/rotationTimeMs) * (rotationTimeMs);
}
@@ -1176,4 +1197,4 @@ public class HAGroupStoreClient implements Closeable {
return clusterType + ":" + targetState;
}
-}
\ No newline at end of file
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
index 2d84e7a284..b358ab5b88 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
@@ -83,6 +83,10 @@ public class HAGroupStoreManager {
/**
* Functional interface for resolving target local states based on current
local state
* when peer cluster transitions occur.
+ * This is used in FailoverManagementListener to determine the target
state based on the current local state.
+ * <p>
+ * For example if peer transitions from AIS -> ANIS,
+ * the target state changes from STANDBY -> DEGRADED_STANDBY
*/
@FunctionalInterface
private interface TargetStateResolver {
@@ -528,21 +532,21 @@ public class HAGroupStoreManager {
* Subscribe to be notified when any transition to a target state occurs.
*
* @param haGroupName the name of the HA group to monitor
- * @param targetState the target state to watch for
+ * @param toState the target state to watch for
* @param clusterType whether to monitor local or peer cluster
* @param listener the listener to notify when any transition to the
target state occurs
* @throws IOException if unable to get HAGroupStoreClient instance
*/
public void subscribeToTargetState(String haGroupName,
- HAGroupStoreRecord.HAGroupState
targetState,
+ HAGroupStoreRecord.HAGroupState toState,
ClusterType clusterType,
HAGroupStateListener listener) throws
IOException {
HAGroupStoreClient client
= getHAGroupStoreClientAndSetupFailoverManagement(haGroupName);
- client.subscribeToTargetState(targetState, clusterType, listener);
+ client.subscribeToTargetState(toState, clusterType, listener);
LOGGER.debug("Delegated subscription to target state {} "
+ "for HA group {} on {} cluster to client",
- targetState, haGroupName, clusterType);
+ toState, haGroupName, clusterType);
}
/**
@@ -595,6 +599,21 @@ public class HAGroupStoreManager {
* Failover management is only set up once per HA group
* to prevent duplicate subscriptions.
*
+ * Failover management is responsible for handling the state transitions
on the local and peer clusters and react accordingly.
+ * Failover management handles peer state transitions and local state
transitions.
+ *
+ * <p>
+ * Example of peer state transition:
+ * For example, if the peer cluster transitions from ACTIVE_IN_SYNC to
ACTIVE_NOT_IN_SYNC,
+ * the failover management will transition the local cluster from STANDBY
to DEGRADED_STANDBY.
+ * </p>
+ *
+ * <p>
+ * Example of local state transition:
+ * For example, if the local cluster transitions to ABORT_TO_STANDBY,
+ * the failover management will transition the local cluster from
ABORT_TO_STANDBY to STANDBY.
+ * </p>
+ *
* @param haGroupName name of the HA group
* @return HAGroupStoreClient instance for the specified HA group
* @throws IOException when HAGroupStoreClient is not initialized
@@ -619,7 +638,27 @@ public class HAGroupStoreManager {
// ===== Failover Management Related Methods =====
- public void setupLocalFailoverManagement(String haGroupName) throws
IOException {
+ /**
+ *
+ * Setup local failover management for the given HA group.
+ * Local failover management is responsible for handling the state
transitions on the local cluster.
+ * Local failover management handles local state transitions.
+ *
+ * <p>
+ * Example of local state transition:
+ * For example, if the local cluster transitions to ABORT_TO_STANDBY,
+ * the failover management will transition the local cluster from
ABORT_TO_STANDBY to STANDBY.
+ * </p>
+ *
+ * When we subscribe to the target state, we provide a
FailoverManagementListener instance.
+ * The FailoverManagementListener implements the HAGroupStateListener
interface and overrides the onStateChange method.
+ * The onStateChange method is called when a state change event occurs.
+ * It is passed the haGroupName, fromState, toState, clusterType, and
lastSyncStateTimeInMs parameters.
+ * It is responsible for determining the target state and transitioning
the local cluster to the target state based on target state resolver.
+ * @param haGroupName
+ * @throws IOException
+ */
+ private void setupLocalFailoverManagement(String haGroupName) throws
IOException {
HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClient(haGroupName);
// Generic subscription loop using static local transition mapping
@@ -637,6 +676,8 @@ public class HAGroupStoreManager {
/**
* Listener implementation for handling peer failover management state
transitions.
* Subscribes to peer state changes and triggers appropriate local state
transitions.
+ *
+ *
*/
private static class FailoverManagementListener implements
HAGroupStateListener {
private final HAGroupStoreClient client;
@@ -648,6 +689,24 @@ public class HAGroupStoreManager {
this.resolver = resolver;
}
+ /*
+
+ <p>
+ Example of peer state transition:
+ For example, if the peer cluster transitions from ACTIVE_IN_SYNC to
ACTIVE_NOT_IN_SYNC,
+ the failover management will transition the local cluster from STANDBY
to DEGRADED_STANDBY.
+ </p>
+ Example input will look like this:
+ haGroupName: test-ha-group
+ fromState: ACTIVE_IN_SYNC
+ toState: ACTIVE_NOT_IN_SYNC
+ clusterType: PEER
+ lastSyncStateTimeInMs: 1719859200000
+
+ Based on this input, the failover management will transition the local
cluster from STANDBY to DEGRADED_STANDBY.
+ The output state will be determined by the TargetStateResolver.
+ */
+
@Override
public void onStateChange(String haGroupName,
HAGroupState fromState,
@@ -677,8 +736,16 @@ public class HAGroupStoreManager {
return;
}
+ // If the target state is STANDBY and we get an event from
+ // PEER cluster, we copy over the lastSyncTimeInMs from
PEER event notification.
+ Long lastSyncTimeInMsNullable = null;
+ if (targetState.getClusterRole() == ClusterRole.STANDBY
+ && clusterType == ClusterType.PEER) {
+ lastSyncTimeInMsNullable = lastSyncStateTimeInMs;
+ }
+
// Execute transition if valid
- client.setHAGroupStatusIfNeeded(targetState);
+ client.setHAGroupStatusIfNeeded(targetState,
lastSyncTimeInMsNullable);
LOGGER.info("Failover management transition: peer {} ->
{}, "
+ "local {} -> {} for HA group: {}",
@@ -696,7 +763,20 @@ public class HAGroupStoreManager {
}
}
- public void setupPeerFailoverManagement(String haGroupName) throws
IOException {
+ /**
+ * Setup peer failover management for the given HA group.
+ * Peer failover management is responsible for handling the state
transitions on the peer cluster.
+ * Peer failover management handles peer state transitions.
+ *
+ * <p>
+ * Example of peer state transition:
+ * For example, if the peer cluster transitions from ACTIVE_IN_SYNC to
ACTIVE_NOT_IN_SYNC,
+ * the failover management will transition the local cluster from STANDBY
to DEGRADED_STANDBY.
+ * </p>
+ * @param haGroupName
+ * @throws IOException
+ */
+ private void setupPeerFailoverManagement(String haGroupName) throws
IOException {
HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClient(haGroupName);
// Generic subscription loop using static transition mapping
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
index aeee4f3851..cf3ed2c045 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
@@ -558,6 +558,36 @@ public class HAGroupStoreClientIT extends BaseTest {
assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY,
currentRecord.getHAGroupState());
}
+ @Test
+ public void
testSetHAGroupStatusIfNeededWithExplicitLastSyncTimeUpdateExistingRecord()
throws Exception {
+ String haGroupName = testName.getMethodName();
+
+ // Create initial record
+ HAGroupStoreRecord initialRecord = new HAGroupStoreRecord("v1.0",
haGroupName,
+ HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, 0L,
HighAvailabilityPolicy.FAILOVER.toString(),
+ this.peerZKUrl, this.masterUrl, this.peerMasterUrl, 0L);
+ createOrUpdateHAGroupStoreRecordOnZookeeper(haAdmin, haGroupName,
initialRecord);
+
+ HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(),
haGroupName, zkUrl);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify initial state
+ HAGroupStoreRecord currentRecord =
haGroupStoreClient.getHAGroupStoreRecord();
+ assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC,
currentRecord.getHAGroupState());
+
+ // Update to STANDBY (this should succeed as it's a valid transition)
+ long timestamp = System.currentTimeMillis();
+
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY,
timestamp);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify the record was updated
+ currentRecord = haGroupStoreClient.getHAGroupStoreRecord();
+
assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY,
currentRecord.getHAGroupState());
+ assertEquals(timestamp, (long)
currentRecord.getLastSyncStateTimeInMs());
+ }
+
+
+
@Test
public void testSetHAGroupStatusIfNeededNoUpdateWhenNotNeeded() throws
Exception {
String haGroupName = testName.getMethodName();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
index 5e1306f079..f5729b49f7 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
@@ -50,6 +50,7 @@ import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
import static
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -817,6 +818,9 @@ public class HAGroupStoreManagerIT extends BaseTest {
assertTrue("Cluster2 record should be present",
cluster2Record.isPresent());
assertEquals("Cluster2 should be in STANDBY state",
HAGroupStoreRecord.HAGroupState.STANDBY,
cluster2Record.get().getHAGroupState());
+ assertEquals(0L, (long)
cluster1Record.get().getLastSyncStateTimeInMs());
+ assertEquals(0L, (long)
cluster2Record.get().getLastSyncStateTimeInMs());
+
// === STEP 1: Transition to store-and-forward mode ===
// Move cluster1 from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC
(store-and-forward mode)
@@ -827,7 +831,8 @@ public class HAGroupStoreManagerIT extends BaseTest {
cluster1Record = cluster1HAManager.getHAGroupStoreRecord(haGroupName);
assertTrue("Cluster1 record should be present",
cluster1Record.isPresent());
assertEquals("Cluster1 should be in ACTIVE_NOT_IN_SYNC state",
- HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
cluster1Record.get().getHAGroupState());
+ HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
+ cluster1Record.get().getHAGroupState());
// === STEP 2: Verify automatic peer reaction to store-and-forward ===
// Cluster2 (standby) should automatically move from STANDBY to
DEGRADED_STANDBY
@@ -838,6 +843,9 @@ public class HAGroupStoreManagerIT extends BaseTest {
assertTrue("Cluster2 record should be present",
cluster2Record.isPresent());
assertEquals("Cluster2 should automatically transition to
DEGRADED_STANDBY",
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY,
cluster2Record.get().getHAGroupState());
+ assertNotEquals(0L, (long)
cluster1Record.get().getLastSyncStateTimeInMs());
+ assertEquals(cluster2Record.get().getLastSyncStateTimeInMs(),
+ cluster1Record.get().getLastSyncStateTimeInMs());
// === STEP 3: Return to sync mode ===
// Move cluster1 back from ACTIVE_NOT_IN_SYNC to ACTIVE_IN_SYNC
@@ -850,7 +858,8 @@ public class HAGroupStoreManagerIT extends BaseTest {
cluster1Record = cluster1HAManager.getHAGroupStoreRecord(haGroupName);
assertTrue("Cluster1 record should be present",
cluster1Record.isPresent());
assertEquals("Cluster1 should be back in ACTIVE_IN_SYNC state",
- HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC,
cluster1Record.get().getHAGroupState());
+ HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC,
+ cluster1Record.get().getHAGroupState());
// === STEP 4: Verify automatic peer recovery ===
// Cluster2 should automatically move from DEGRADED_STANDBY back to
STANDBY
@@ -861,5 +870,8 @@ public class HAGroupStoreManagerIT extends BaseTest {
assertTrue("Cluster2 record should be present",
cluster2Record.isPresent());
assertEquals("Cluster2 should automatically transition back to
STANDBY",
HAGroupStoreRecord.HAGroupState.STANDBY,
cluster2Record.get().getHAGroupState());
+ assertNotEquals(0L, (long)
cluster1Record.get().getLastSyncStateTimeInMs());
+ assertEquals(cluster2Record.get().getLastSyncStateTimeInMs(),
+ cluster1Record.get().getLastSyncStateTimeInMs());
}
}
\ No newline at end of file