This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 1e1b9e52ca PHOENIX-7566 HAGroupState subscription feature and state 
management for ReplicationLogReader (#2274) (#2340)
1e1b9e52ca is described below

commit 1e1b9e52ca2fb6f2cd11be7959af0bc46109b57a
Author: Lokesh Khurana <[email protected]>
AuthorDate: Wed Jan 7 10:42:14 2026 -0800

    PHOENIX-7566 HAGroupState subscription feature and state management for 
ReplicationLogReader (#2274) (#2340)
---
 .../java/org/apache/phoenix/jdbc/ClusterType.java  |  34 +
 .../apache/phoenix/jdbc/HAGroupStateListener.java  |  54 ++
 .../apache/phoenix/jdbc/HAGroupStoreClient.java    | 236 +++++--
 .../apache/phoenix/jdbc/HAGroupStoreManager.java   | 186 ++++--
 .../apache/phoenix/jdbc/HAGroupStoreRecord.java    |  26 +-
 ...gionServerEndpointITWithConsistentFailover.java |   4 +-
 .../IndexRegionObserverMutationBlockingIT.java     |   4 +-
 .../phoenix/jdbc/HAGroupStateSubscriptionIT.java   | 717 +++++++++++++++++++++
 .../apache/phoenix/jdbc/HAGroupStoreClientIT.java  |   8 +-
 .../apache/phoenix/jdbc/HAGroupStoreManagerIT.java | 302 ++++++++-
 .../org/apache/phoenix/jdbc/PhoenixHAAdminIT.java  |   6 +-
 .../apache/phoenix/util/HAGroupStoreTestUtil.java  |  16 +
 12 files changed, 1473 insertions(+), 120 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterType.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterType.java
new file mode 100644
index 0000000000..95d021a7b5
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterType.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+/**
+ * Enumeration representing the type of cluster in an HA group configuration. 
Used to distinguish
+ * between local and peer clusters when subscribing to HA group state change 
notifications.
+ */
+public enum ClusterType {
+  /**
+   * Represents the local cluster where the client is running.
+   */
+  LOCAL,
+
+  /**
+   * Represents the peer cluster in the HA group configuration.
+   */
+  PEER
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
new file mode 100644
index 0000000000..0fbdbf5b5e
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
+
+/**
+ * Interface for external clients who want to be notified of HA group state 
transitions.
+ * <p>
+ * Listeners can subscribe to be notified when:
+ * </p>
+ * <ul>
+ * <li>Specific state transitions occur (from one state to another)</li>
+ * <li>Any transition to a target state occurs (from any state to a specific 
state)</li>
+ * </ul>
+ * <p>
+ * Notifications are provided for both local and peer cluster state changes, 
distinguished by the
+ * {@link ClusterType} parameter.
+ * </p>
+ * @see HAGroupStoreManager#subscribeToTargetState
+ */
+public interface HAGroupStateListener {
+
+  /**
+   * Called when an HA group state transition occurs.
+   * <p>
+   * Implementations should be fast and non-blocking to avoid impacting the HA 
group state
+   * management system. If heavy processing is required, consider delegating 
to a separate thread.
+   * </p>
+   * @param haGroupName  the name of the HA group that transitioned
+   * @param toState      the new state after the transition
+   * @param modifiedTime the time the state transition occurred
+   * @param clusterType  whether this transition occurred on the local or peer 
cluster
+   * @throws Exception implementations may throw exceptions, but they will be 
logged and will not
+   *                   prevent other listeners from being notified
+   */
+  void onStateChange(String haGroupName, HAGroupState toState, long 
modifiedTime,
+    ClusterType clusterType);
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
index 64d76b9e6d..70bd4b43f0 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -42,10 +42,13 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
@@ -75,14 +78,13 @@ import 
org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
  */
 public class HAGroupStoreClient implements Closeable {
 
-  public static final String ZK_CONSISTENT_HA_NAMESPACE =
-    "phoenix" + ZKPaths.PATH_SEPARATOR + "consistentHA";
+  public static final String ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE =
+    "phoenix" + ZKPaths.PATH_SEPARATOR + "consistentHA" + 
ZKPaths.PATH_SEPARATOR + "groupState";
   private static final long HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS = 
30000L;
   // Multiplier for ZK session timeout to account for time it will take for 
HMaster to abort
   // the region server in case ZK connection is lost from the region server.
-  private static final double ZK_SESSION_TIMEOUT_MULTIPLIER = 1.1;
-  private static final String CACHE_TYPE_LOCAL = "LOCAL";
-  private static final String CACHE_TYPE_PEER = "PEER";
+  @VisibleForTesting
+  static final double ZK_SESSION_TIMEOUT_MULTIPLIER = 1.1;
   private PhoenixHAAdmin phoenixHaAdmin;
   private PhoenixHAAdmin peerPhoenixHaAdmin;
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HAGroupStoreClient.class);
@@ -107,6 +109,14 @@ public class HAGroupStoreClient implements Closeable {
   private final PathChildrenCacheListener peerCustomPathChildrenCacheListener;
   // Wait time for sync mode
   private final long waitTimeForSyncModeInMs;
+  // State tracking for transition detection
+  private volatile HAGroupState lastKnownLocalState;
+  private volatile HAGroupState lastKnownPeerState;
+
+  // Subscription storage for HA group state change notifications per client 
instance
+  // Map key format: "clusterType:targetState" -> Set<Listeners>
+  private final ConcurrentHashMap<String,
+    CopyOnWriteArraySet<HAGroupStateListener>> targetStateSubscribers = new 
ConcurrentHashMap<>();
   // Policy for the HA group
   private HighAvailabilityPolicy policy;
   private ClusterRole clusterRole;
@@ -130,7 +140,7 @@ public class HAGroupStoreClient implements Closeable {
    * @return HAGroupStoreClient instance
    */
   public static HAGroupStoreClient getInstanceForZkUrl(Configuration conf, 
String haGroupName,
-    String zkUrl) throws SQLException {
+    String zkUrl) {
     Preconditions.checkNotNull(haGroupName, "haGroupName cannot be null");
     String localZkUrl = Objects.toString(zkUrl, getLocalZkUrl(conf));
     Preconditions.checkNotNull(localZkUrl, "zkUrl cannot be null");
@@ -157,7 +167,7 @@ public class HAGroupStoreClient implements Closeable {
 
   /**
    * Get the list of HAGroupNames from system table. We can also get the list 
of HAGroupNames from
-   * the system table by providing the zkUrl in where clause but we need to 
match the formatted
+   * the system table by providing the zkUrl in where clause, but we need to 
match the formatted
    * zkUrl with the zkUrl in the system table so that matching is done 
correctly.
    * @param zkUrl for connecting to Table
    * @return the list of HAGroupNames
@@ -204,10 +214,11 @@ public class HAGroupStoreClient implements Closeable {
       // Initialize HAGroupStoreClient attributes
       initializeHAGroupStoreClientAttributes(haGroupName);
       // Initialize Phoenix HA Admin
-      this.phoenixHaAdmin = new PhoenixHAAdmin(this.zkUrl, conf, 
ZK_CONSISTENT_HA_NAMESPACE);
+      this.phoenixHaAdmin =
+        new PhoenixHAAdmin(this.zkUrl, conf, 
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
       // Initialize local cache
       this.pathChildrenCache =
-        initializePathChildrenCache(phoenixHaAdmin, pathChildrenCacheListener, 
CACHE_TYPE_LOCAL);
+        initializePathChildrenCache(phoenixHaAdmin, pathChildrenCacheListener, 
ClusterType.LOCAL);
       // Initialize ZNode if not present in ZK
       initializeZNodeIfNeeded();
       if (this.pathChildrenCache != null) {
@@ -237,7 +248,6 @@ public class HAGroupStoreClient implements Closeable {
     initializeHAGroupStoreClientAttributes(haGroupName);
     initializeZNodeIfNeeded();
     maybeInitializePeerPathChildrenCache();
-
     // NOTE: this is a BLOCKING method.
     // Completely rebuild the internal cache by querying for all needed data
     // WITHOUT generating any events to send to listeners.
@@ -259,7 +269,7 @@ public class HAGroupStoreClient implements Closeable {
     if (!isHealthy) {
       throw new IOException("HAGroupStoreClient is not healthy");
     }
-    return fetchCacheRecord(this.pathChildrenCache, 
CACHE_TYPE_LOCAL).getLeft();
+    return fetchCacheRecord(this.pathChildrenCache, 
ClusterType.LOCAL).getLeft();
   }
 
   /**
@@ -279,7 +289,7 @@ public class HAGroupStoreClient implements Closeable {
       throw new IOException("HAGroupStoreClient is not healthy");
     }
     Pair<HAGroupStoreRecord, Stat> cacheRecord =
-      fetchCacheRecord(this.pathChildrenCache, CACHE_TYPE_LOCAL);
+      fetchCacheRecord(this.pathChildrenCache, ClusterType.LOCAL);
     HAGroupStoreRecord currentHAGroupStoreRecord = cacheRecord.getLeft();
     Stat currentHAGroupStoreRecordStat = cacheRecord.getRight();
     if (currentHAGroupStoreRecord == null) {
@@ -291,9 +301,29 @@ public class HAGroupStoreClient implements Closeable {
       isUpdateNeeded(currentHAGroupStoreRecord.getHAGroupState(),
         currentHAGroupStoreRecordStat.getMtime(), haGroupState)
     ) {
+      // We maintain last sync time as the last time cluster was in sync state.
+      // If state changes from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, record 
that time
+      // Once state changes back to ACTIVE_IN_SYNC or the role is
+      // NOT ACTIVE or ACTIVE_TO_STANDBY
+      // set the time to null to mark that we are current(or we don't have any 
reader).
+      // TODO: Verify that for reader this is the correct approach.
+      Long lastSyncTimeInMs = 
currentHAGroupStoreRecord.getLastSyncStateTimeInMs();
+      ClusterRole clusterRole = haGroupState.getClusterRole();
+      if (
+        currentHAGroupStoreRecord.getHAGroupState()
+            == HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC
+          && haGroupState == HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC
+      ) {
+        lastSyncTimeInMs = System.currentTimeMillis();
+      } else if (
+        haGroupState == HAGroupState.ACTIVE_IN_SYNC || 
!(ClusterRole.ACTIVE.equals(clusterRole)
+          || ClusterRole.ACTIVE_TO_STANDBY.equals(clusterRole))
+      ) {
+        lastSyncTimeInMs = null;
+      }
       HAGroupStoreRecord newHAGroupStoreRecord =
         new HAGroupStoreRecord(currentHAGroupStoreRecord.getProtocolVersion(),
-          currentHAGroupStoreRecord.getHaGroupName(), haGroupState);
+          currentHAGroupStoreRecord.getHaGroupName(), haGroupState, 
lastSyncTimeInMs);
       // TODO: Check if cluster role is changing, if so, we need to update
       // the system table first
       // Lock the row in System Table and make sure update is reflected
@@ -326,11 +356,11 @@ public class HAGroupStoreClient implements Closeable {
    * @return HAGroupStoreRecord for the specified HA group name, or null if 
not found
    * @throws IOException if the client is not healthy
    */
-  private HAGroupStoreRecord getHAGroupStoreRecordFromPeer() throws 
IOException {
+  public HAGroupStoreRecord getHAGroupStoreRecordFromPeer() throws IOException 
{
     if (!isHealthy) {
       throw new IOException("HAGroupStoreClient is not healthy");
     }
-    return fetchCacheRecord(this.peerPathChildrenCache, 
CACHE_TYPE_PEER).getLeft();
+    return fetchCacheRecord(this.peerPathChildrenCache, 
ClusterType.PEER).getLeft();
   }
 
   private void initializeZNodeIfNeeded()
@@ -340,11 +370,18 @@ public class HAGroupStoreClient implements Closeable {
     Pair<HAGroupStoreRecord, Stat> cacheRecordFromZK =
       phoenixHaAdmin.getHAGroupStoreRecordInZooKeeper(this.haGroupName);
     HAGroupStoreRecord haGroupStoreRecord = cacheRecordFromZK.getLeft();
+    HAGroupState defaultHAGroupState = 
this.clusterRole.getDefaultHAGroupState();
+    // Initialize lastSyncTimeInMs only if we start in ACTIVE_NOT_IN_SYNC state
+    // and ZNode is not already present
+    Long lastSyncTimeInMs = 
defaultHAGroupState.equals(HAGroupState.ACTIVE_NOT_IN_SYNC)
+      ? System.currentTimeMillis()
+      : null;
     HAGroupStoreRecord newHAGroupStoreRecord =
       new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, 
haGroupName,
-        this.clusterRole.getDefaultHAGroupState());
+        this.clusterRole.getDefaultHAGroupState(), lastSyncTimeInMs);
     // Only update current ZNode if it doesn't have the same role as present 
in System Table.
     // If not exists, then create ZNode
+    // TODO: Discuss if this approach is what reader needs.
     if (
       haGroupStoreRecord != null && 
!haGroupStoreRecord.getClusterRole().equals(this.clusterRole)
     ) {
@@ -414,17 +451,17 @@ public class HAGroupStoreClient implements Closeable {
       try {
         // Setup peer connection if needed (first time or ZK Url changed)
         if (
-          peerPathChildrenCache == null || (peerPhoenixHaAdmin != null
-            && !StringUtils.equals(this.peerZKUrl, 
peerPhoenixHaAdmin.getZkUrl()))
+          peerPathChildrenCache == null || peerPhoenixHaAdmin != null
+            && !StringUtils.equals(this.peerZKUrl, 
peerPhoenixHaAdmin.getZkUrl())
         ) {
           // Clean up existing peer connection if it exists
           closePeerConnection();
           // Setup new peer connection
           this.peerPhoenixHaAdmin =
-            new PhoenixHAAdmin(this.peerZKUrl, conf, 
ZK_CONSISTENT_HA_NAMESPACE);
+            new PhoenixHAAdmin(this.peerZKUrl, conf, 
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
           // Create new PeerPathChildrenCache
           this.peerPathChildrenCache = 
initializePathChildrenCache(peerPhoenixHaAdmin,
-            this.peerCustomPathChildrenCacheListener, CACHE_TYPE_PEER);
+            this.peerCustomPathChildrenCacheListener, ClusterType.PEER);
         }
       } catch (Exception e) {
         closePeerConnection();
@@ -442,7 +479,7 @@ public class HAGroupStoreClient implements Closeable {
   }
 
   private PathChildrenCache initializePathChildrenCache(PhoenixHAAdmin admin,
-    PathChildrenCacheListener customListener, String cacheType) {
+    PathChildrenCacheListener customListener, ClusterType cacheType) {
     LOGGER.info("Initializing {} PathChildrenCache with URL {}", cacheType, 
admin.getZkUrl());
     PathChildrenCache newPathChildrenCache = null;
     try {
@@ -468,29 +505,37 @@ public class HAGroupStoreClient implements Closeable {
     }
   }
 
-  private PathChildrenCacheListener createCacheListener(CountDownLatch latch, 
String cacheType) {
+  private PathChildrenCacheListener createCacheListener(CountDownLatch latch,
+    ClusterType cacheType) {
     return (client, event) -> {
       final ChildData childData = event.getData();
-      HAGroupStoreRecord eventRecord = 
extractHAGroupStoreRecordOrNull(childData);
+      Pair<HAGroupStoreRecord, Stat> eventRecordAndStat =
+        extractHAGroupStoreRecordOrNull(childData);
+      HAGroupStoreRecord eventRecord = eventRecordAndStat.getLeft();
+      Stat eventStat = eventRecordAndStat.getRight();
       LOGGER.info("HAGroupStoreClient Cache {} received event {} type {} at 
{}", cacheType,
         eventRecord, event.getType(), System.currentTimeMillis());
       switch (event.getType()) {
-        // TODO: Add support for event watcher for HAGroupStoreRecord
-        // case CHILD_ADDED:
-        // case CHILD_UPDATED:
-        // case CHILD_REMOVED:
+        case CHILD_ADDED:
+        case CHILD_UPDATED:
+          if (eventRecord != null) {
+            handleStateChange(eventRecord, eventStat, cacheType);
+          }
+          break;
+        case CHILD_REMOVED:
+          break;
         case INITIALIZED:
           latch.countDown();
           break;
         case CONNECTION_LOST:
         case CONNECTION_SUSPENDED:
-          if (CACHE_TYPE_LOCAL.equals(cacheType)) {
+          if (ClusterType.LOCAL.equals(cacheType)) {
             isHealthy = false;
           }
           LOGGER.warn("{} HAGroupStoreClient cache connection lost/suspended", 
cacheType);
           break;
         case CONNECTION_RECONNECTED:
-          if (CACHE_TYPE_LOCAL.equals(cacheType)) {
+          if (ClusterType.LOCAL.equals(cacheType)) {
             isHealthy = true;
           }
           LOGGER.info("{} HAGroupStoreClient cache connection reconnected", 
cacheType);
@@ -503,7 +548,7 @@ public class HAGroupStoreClient implements Closeable {
   }
 
   private Pair<HAGroupStoreRecord, Stat> fetchCacheRecord(PathChildrenCache 
cache,
-    String cacheType) {
+    ClusterType cacheType) {
     if (cache == null) {
       LOGGER.warn("{} HAGroupStoreClient cache is null, returning null", 
cacheType);
       return Pair.of(null, null);
@@ -516,7 +561,7 @@ public class HAGroupStoreClient implements Closeable {
       return result;
     }
 
-    if (cacheType.equals(CACHE_TYPE_PEER)) {
+    if (cacheType.equals(ClusterType.PEER)) {
       return Pair.of(null, null);
     }
     // If no record found, try to rebuild and fetch again
@@ -533,23 +578,23 @@ public class HAGroupStoreClient implements Closeable {
   }
 
   private Pair<HAGroupStoreRecord, Stat> 
extractRecordAndStat(PathChildrenCache cache,
-    String targetPath, String cacheType) {
+    String targetPath, ClusterType cacheType) {
     ChildData childData = cache.getCurrentData(targetPath);
     if (childData != null) {
-      HAGroupStoreRecord record = extractHAGroupStoreRecordOrNull(childData);
-      Stat currentStat = childData.getStat();
-      LOGGER.info("Built {} cluster record: {}", cacheType, record);
-      return Pair.of(record, currentStat);
+      Pair<HAGroupStoreRecord, Stat> recordAndStat = 
extractHAGroupStoreRecordOrNull(childData);
+      LOGGER.info("Built {} cluster record: {}", cacheType, 
recordAndStat.getLeft());
+      return recordAndStat;
     }
     return Pair.of(null, null);
   }
 
-  private HAGroupStoreRecord extractHAGroupStoreRecordOrNull(final ChildData 
childData) {
+  private Pair<HAGroupStoreRecord, Stat>
+    extractHAGroupStoreRecordOrNull(final ChildData childData) {
     if (childData != null) {
       byte[] data = childData.getData();
-      return HAGroupStoreRecord.fromJson(data).orElse(null);
+      return Pair.of(HAGroupStoreRecord.fromJson(data).orElse(null), 
childData.getStat());
     }
-    return null;
+    return Pair.of(null, null);
   }
 
   /**
@@ -613,4 +658,117 @@ public class HAGroupStoreClient implements Closeable {
     return ((System.currentTimeMillis() - currentHAGroupStoreRecordMtime) > 
waitTime);
   }
 
+  // ========== HA Group State Change Subscription Methods ==========
+
+  /**
+   * Subscribe to be notified when any transition to a target state occurs.
+   * @param targetState the target state to watch for
+   * @param clusterType whether to monitor local or peer cluster
+   * @param listener    the listener to notify when any transition to the 
target state occurs
+   */
+  public void subscribeToTargetState(HAGroupState targetState, ClusterType 
clusterType,
+    HAGroupStateListener listener) {
+    String key = buildTargetStateKey(clusterType, targetState);
+    targetStateSubscribers.computeIfAbsent(key, k -> new 
CopyOnWriteArraySet<>()).add(listener);
+    LOGGER.info("Subscribed listener to target state {} for HA group {} on {} 
cluster", targetState,
+      haGroupName, clusterType);
+  }
+
+  /**
+   * Unsubscribe from target state notifications.
+   * @param targetState the target state
+   * @param clusterType whether monitoring local or peer cluster
+   * @param listener    the listener to remove
+   */
+  public void unsubscribeFromTargetState(HAGroupState targetState, ClusterType 
clusterType,
+    HAGroupStateListener listener) {
+    String key = buildTargetStateKey(clusterType, targetState);
+    CopyOnWriteArraySet<HAGroupStateListener> listeners = 
targetStateSubscribers.get(key);
+    if (listeners != null && listeners.remove(listener)) {
+      if (listeners.isEmpty()) {
+        targetStateSubscribers.remove(key);
+      }
+      LOGGER.info("Unsubscribed listener from target state {} for HA group {} 
on {} cluster",
+        targetState, haGroupName, clusterType);
+    }
+  }
+
+  /**
+   * Handle state change detection and notify subscribers if a transition 
occurred.
+   * @param newRecord the new HA group store record
+   * @param cacheType the type of cache (LOCAL or PEER)
+   */
+  private void handleStateChange(HAGroupStoreRecord newRecord, Stat newStat,
+    ClusterType cacheType) {
+    HAGroupState newState = newRecord.getHAGroupState();
+    HAGroupState oldState;
+    ClusterType clusterType;
+
+    if (ClusterType.LOCAL.equals(cacheType)) {
+      oldState = lastKnownLocalState;
+      lastKnownLocalState = newState;
+      clusterType = ClusterType.LOCAL;
+    } else {
+      oldState = lastKnownPeerState;
+      lastKnownPeerState = newState;
+      clusterType = ClusterType.PEER;
+    }
+
+    // Only notify if there's an actual state transition or initial state
+    if (oldState == null || !oldState.equals(newState)) {
+      LOGGER.info("Detected state transition for HA group {} from {} to {} on 
{} cluster",
+        haGroupName, oldState, newState, clusterType);
+      notifySubscribers(oldState, newState, newStat.getMtime(), clusterType);
+    }
+  }
+
+  /**
+   * Notify all relevant subscribers of a state transition.
+   * @param fromState   the state transitioned from
+   * @param toState     the state transitioned to
+   * @param clusterType the cluster type where the transition occurred
+   */
+  private void notifySubscribers(HAGroupState fromState, HAGroupState toState, 
long modifiedTime,
+    ClusterType clusterType) {
+    LOGGER.debug(
+      "Notifying subscribers of state transition for HA group {} from {} to {} 
on {} " + "cluster",
+      haGroupName, fromState, toState, clusterType);
+    String targetStateKey = buildTargetStateKey(clusterType, toState);
+
+    // Collect all listeners that need to be notified
+    Set<HAGroupStateListener> listenersToNotify = new HashSet<>();
+
+    // Find target state subscribers
+    CopyOnWriteArraySet<HAGroupStateListener> targetListeners =
+      targetStateSubscribers.get(targetStateKey);
+    if (targetListeners != null) {
+      listenersToNotify.addAll(targetListeners);
+    }
+
+    // Notify all listeners with error isolation
+    if (!listenersToNotify.isEmpty()) {
+      LOGGER.info("Notifying {} listeners of state transitionfor HA group {} 
from {} to {} on {} "
+        + "cluster", listenersToNotify.size(), haGroupName, fromState, 
toState, clusterType);
+
+      for (HAGroupStateListener listener : listenersToNotify) {
+        try {
+          listener.onStateChange(haGroupName, toState, modifiedTime, 
clusterType);
+        } catch (Exception e) {
+          LOGGER.error("Error notifying listener of state transition for HA 
group {} from {} to "
+            + "{} on {} cluster", haGroupName, fromState, toState, 
clusterType, e);
+          // Continue notifying other listeners
+        }
+      }
+    }
+  }
+
+  // ========== Helper Methods ==========
+
+  /**
+   * Build key for target state subscriptions.
+   */
+  private String buildTargetStateKey(ClusterType clusterType, HAGroupState 
targetState) {
+    return clusterType + ":" + targetState;
+  }
+
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
index 1a7fe0985b..11a5ce7a40 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
@@ -37,7 +37,8 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of HAGroupStoreManager that uses HAGroupStoreClient. Manages 
all
- * HAGroupStoreClient instances.
+ * HAGroupStoreClient instances and provides passthrough functionality for HA 
group state change
+ * notifications.
  */
 public class HAGroupStoreManager {
   private static volatile HAGroupStoreManager haGroupStoreManagerInstance;
@@ -99,16 +100,12 @@ public class HAGroupStoreManager {
    * @return true if mutation is blocked, false otherwise.
    * @throws IOException when HAGroupStoreClient is not healthy.
    */
-  public boolean isMutationBlocked(String haGroupName) throws IOException, 
SQLException {
+  public boolean isMutationBlocked(String haGroupName) throws IOException {
     if (mutationBlockEnabled) {
-      HAGroupStoreClient haGroupStoreClient =
-        HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl);
-      if (haGroupStoreClient != null) {
-        return haGroupStoreClient.getHAGroupStoreRecord() != null
-          && haGroupStoreClient.getHAGroupStoreRecord().getClusterRole() != 
null
-          && 
haGroupStoreClient.getHAGroupStoreRecord().getClusterRole().isMutationBlocked();
-      }
-      throw new IOException("HAGroupStoreClient is not initialized");
+      HAGroupStoreClient haGroupStoreClient = 
getHAGroupStoreClient(haGroupName);
+      HAGroupStoreRecord recordWithMetadata = 
haGroupStoreClient.getHAGroupStoreRecord();
+      return recordWithMetadata != null && recordWithMetadata.getClusterRole() 
!= null
+        && recordWithMetadata.getClusterRole().isMutationBlocked();
     }
     return false;
   }
@@ -146,13 +143,8 @@ public class HAGroupStoreManager {
    */
   public void invalidateHAGroupStoreClient(final String haGroupName, boolean 
broadcastUpdate)
     throws Exception {
-    HAGroupStoreClient haGroupStoreClient =
-      HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl);
-    if (haGroupStoreClient != null) {
-      haGroupStoreClient.rebuild();
-    } else {
-      throw new IOException("HAGroupStoreClient is not initialized");
-    }
+    HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
+    haGroupStoreClient.rebuild();
   }
 
   /**
@@ -163,13 +155,22 @@ public class HAGroupStoreManager {
    * @throws IOException when HAGroupStoreClient is not healthy.
    */
   public Optional<HAGroupStoreRecord> getHAGroupStoreRecord(final String 
haGroupName)
-    throws IOException, SQLException {
-    HAGroupStoreClient haGroupStoreClient =
-      HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl);
-    if (haGroupStoreClient != null) {
-      return Optional.ofNullable(haGroupStoreClient.getHAGroupStoreRecord());
-    }
-    throw new IOException("HAGroupStoreClient is not initialized");
+    throws IOException {
+    HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
+    return Optional.ofNullable(haGroupStoreClient.getHAGroupStoreRecord());
+  }
+
+  /**
+   * Returns the HAGroupStoreRecord for a specific HA group from peer cluster.
+   * @param haGroupName name of the HA group
+   * @return Optional HAGroupStoreRecord for the HA group from peer cluster 
can be empty if the HA
+   *         group is not found or peer cluster is not available.
+   * @throws IOException when HAGroupStoreClient is not healthy.
+   */
+  public Optional<HAGroupStoreRecord> getPeerHAGroupStoreRecord(final String 
haGroupName)
+    throws IOException {
+    HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
+    return 
Optional.ofNullable(haGroupStoreClient.getHAGroupStoreRecordFromPeer());
   }
 
   /**
@@ -178,15 +179,9 @@ public class HAGroupStoreManager {
    * @throws IOException when HAGroupStoreClient is not healthy.
    */
   public void setHAGroupStatusToStoreAndForward(final String haGroupName) 
throws IOException,
-    StaleHAGroupStoreRecordVersionException, 
InvalidClusterRoleTransitionException, SQLException {
-    HAGroupStoreClient haGroupStoreClient =
-      HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl);
-    if (haGroupStoreClient != null) {
-      haGroupStoreClient
-        
.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
-    } else {
-      throw new IOException("HAGroupStoreClient is not initialized");
-    }
+    StaleHAGroupStoreRecordVersionException, 
InvalidClusterRoleTransitionException {
+    HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
+    
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
   }
 
   /**
@@ -194,15 +189,65 @@ public class HAGroupStoreManager {
    * @param haGroupName name of the HA group
    * @throws IOException when HAGroupStoreClient is not healthy.
    */
-  public void setHAGroupStatusRecordToSync(final String haGroupName) throws 
IOException,
-    StaleHAGroupStoreRecordVersionException, 
InvalidClusterRoleTransitionException, SQLException {
-    HAGroupStoreClient haGroupStoreClient =
-      HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl);
-    if (haGroupStoreClient != null) {
-      
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
-    } else {
-      throw new IOException("HAGroupStoreClient is not initialized");
+  public void setHAGroupStatusToSync(final String haGroupName) throws 
IOException,
+    StaleHAGroupStoreRecordVersionException, 
InvalidClusterRoleTransitionException {
+    HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
+    
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
+  }
+
+  /**
+   * Sets the HAGroupStoreRecord to degrade reader functionality in local 
cluster. Transitions from
+   * STANDBY to DEGRADED_STANDBY_FOR_READER or from 
DEGRADED_STANDBY_FOR_WRITER to DEGRADED_STANDBY.
+   * @param haGroupName name of the HA group
+   * @throws IOException                           when HAGroupStoreClient is 
not healthy.
+   * @throws InvalidClusterRoleTransitionException when the current state 
cannot transition to a
+   *                                               degraded reader state
+   */
+  public void setReaderToDegraded(final String haGroupName) throws IOException,
+    StaleHAGroupStoreRecordVersionException, 
InvalidClusterRoleTransitionException {
+    HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
+    HAGroupStoreRecord currentRecord = 
haGroupStoreClient.getHAGroupStoreRecord();
+
+    if (currentRecord == null) {
+      throw new IOException("Current HAGroupStoreRecord is null for HA group: 
" + haGroupName);
+    }
+
+    HAGroupStoreRecord.HAGroupState currentState = 
currentRecord.getHAGroupState();
+    HAGroupStoreRecord.HAGroupState targetState =
+      HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER;
+
+    if (currentState == 
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER) {
+      targetState = HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY;
     }
+
+    haGroupStoreClient.setHAGroupStatusIfNeeded(targetState);
+  }
+
+  /**
+   * Sets the HAGroupStoreRecord to restore reader functionality in local 
cluster. Transitions from
+   * DEGRADED_STANDBY_FOR_READER to STANDBY or from DEGRADED_STANDBY to 
DEGRADED_STANDBY_FOR_WRITER.
+   * @param haGroupName name of the HA group
+   * @throws IOException                           when HAGroupStoreClient is 
not healthy.
+   * @throws InvalidClusterRoleTransitionException when the current state 
cannot transition to a
+   *                                               healthy reader state
+   */
+  public void setReaderToHealthy(final String haGroupName) throws IOException,
+    StaleHAGroupStoreRecordVersionException, 
InvalidClusterRoleTransitionException {
+    HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
+    HAGroupStoreRecord currentRecord = 
haGroupStoreClient.getHAGroupStoreRecord();
+
+    if (currentRecord == null) {
+      throw new IOException("Current HAGroupStoreRecord is null for HA group: 
" + haGroupName);
+    }
+
+    HAGroupStoreRecord.HAGroupState currentState = 
currentRecord.getHAGroupState();
+    HAGroupStoreRecord.HAGroupState targetState = 
HAGroupStoreRecord.HAGroupState.STANDBY;
+
+    if (currentState == HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY) {
+      targetState = 
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER;
+    }
+
+    haGroupStoreClient.setHAGroupStatusIfNeeded(targetState);
   }
 
   /**
@@ -212,13 +257,62 @@ public class HAGroupStoreManager {
    * @return ClusterRoleRecord for the cluster pair
    * @throws IOException when HAGroupStoreClient is not healthy.
    */
-  public ClusterRoleRecord getClusterRoleRecord(String haGroupName)
-    throws IOException, SQLException {
+  public ClusterRoleRecord getClusterRoleRecord(String haGroupName) throws 
IOException {
+    HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
+    return haGroupStoreClient.getClusterRoleRecord();
+  }
+
+  /**
+   * Subscribe to be notified when any transition to a target state occurs.
+   * @param haGroupName the name of the HA group to monitor
+   * @param targetState the target state to watch for
+   * @param clusterType whether to monitor local or peer cluster
+   * @param listener    the listener to notify when any transition to the 
target state occurs
+   * @throws IOException if unable to get HAGroupStoreClient instance
+   */
+  public void subscribeToTargetState(String haGroupName,
+    HAGroupStoreRecord.HAGroupState targetState, ClusterType clusterType,
+    HAGroupStateListener listener) throws IOException {
+    HAGroupStoreClient client = getHAGroupStoreClient(haGroupName);
+    client.subscribeToTargetState(targetState, clusterType, listener);
+    LOGGER.debug(
+      "Delegated subscription to target state {} " + "for HA group {} on {} 
cluster to client",
+      targetState, haGroupName, clusterType);
+  }
+
+  /**
+   * Unsubscribe from target state notifications.
+   * @param haGroupName the name of the HA group
+   * @param targetState the target state
+   * @param clusterType whether monitoring local or peer cluster
+   * @param listener    the listener to remove
+   */
+  public void unsubscribeFromTargetState(String haGroupName,
+    HAGroupStoreRecord.HAGroupState targetState, ClusterType clusterType,
+    HAGroupStateListener listener) {
+    try {
+      HAGroupStoreClient client = getHAGroupStoreClient(haGroupName);
+      client.unsubscribeFromTargetState(targetState, clusterType, listener);
+      LOGGER.debug("Delegated unsubscription from target state {} "
+        + "for HA group {} on {} cluster to client", targetState, haGroupName, 
clusterType);
+    } catch (IOException e) {
+      LOGGER.warn("HAGroupStoreClient not found for HA group: {} - cannot 
unsubscribe: {}",
+        haGroupName, e.getMessage());
+    }
+  }
+
+  /**
+   * Helper method to get HAGroupStoreClient instance with consistent error 
handling.
+   * @param haGroupName name of the HA group
+   * @return HAGroupStoreClient instance for the specified HA group
+   * @throws IOException when HAGroupStoreClient is not initialized
+   */
+  private HAGroupStoreClient getHAGroupStoreClient(final String haGroupName) 
throws IOException {
     HAGroupStoreClient haGroupStoreClient =
       HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl);
-    if (haGroupStoreClient != null) {
-      return haGroupStoreClient.getClusterRoleRecord();
+    if (haGroupStoreClient == null) {
+      throw new IOException("HAGroupStoreClient is not initialized for HA 
group: " + haGroupName);
     }
-    throw new IOException("HAGroupStoreClient is not initialized");
+    return haGroupStoreClient;
   }
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java
index e3e1a4a628..4077a4b819 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java
@@ -152,17 +152,27 @@ public class HAGroupStoreRecord {
   private final String protocolVersion;
   private final String haGroupName;
   private final HAGroupState haGroupState;
+  private final Long lastSyncStateTimeInMs;
 
   @JsonCreator
   public HAGroupStoreRecord(@JsonProperty("protocolVersion") String 
protocolVersion,
     @JsonProperty("haGroupName") String haGroupName,
-    @JsonProperty("haGroupState") HAGroupState haGroupState) {
+    @JsonProperty("haGroupState") HAGroupState haGroupState,
+    @JsonProperty("lastSyncStateTimeInMs") Long lastSyncStateTimeInMs) {
     Preconditions.checkNotNull(haGroupName, "HA group name cannot be null!");
     Preconditions.checkNotNull(haGroupState, "HA group state cannot be null!");
 
     this.protocolVersion = Objects.toString(protocolVersion, 
DEFAULT_PROTOCOL_VERSION);
     this.haGroupName = haGroupName;
     this.haGroupState = haGroupState;
+    this.lastSyncStateTimeInMs = lastSyncStateTimeInMs;
+  }
+
+  /**
+   * Convenience constructor for backward compatibility without 
lastSyncStateTimeInMs.
+   */
+  public HAGroupStoreRecord(String protocolVersion, String haGroupName, 
HAGroupState haGroupState) {
+    this(protocolVersion, haGroupName, haGroupState, null);
   }
 
   public static Optional<HAGroupStoreRecord> fromJson(byte[] bytes) {
@@ -183,7 +193,8 @@ public class HAGroupStoreRecord {
 
   public boolean hasSameInfo(HAGroupStoreRecord other) {
     return haGroupName.equals(other.haGroupName) && 
haGroupState.equals(other.haGroupState)
-      && protocolVersion.equals(other.protocolVersion);
+      && protocolVersion.equals(other.protocolVersion)
+      && Objects.equals(lastSyncStateTimeInMs, other.lastSyncStateTimeInMs);
   }
 
   public String getProtocolVersion() {
@@ -199,6 +210,10 @@ public class HAGroupStoreRecord {
     return haGroupState;
   }
 
+  public Long getLastSyncStateTimeInMs() {
+    return lastSyncStateTimeInMs;
+  }
+
   @JsonIgnore
   public ClusterRoleRecord.ClusterRole getClusterRole() {
     return haGroupState.getClusterRole();
@@ -207,7 +222,7 @@ public class HAGroupStoreRecord {
   @Override
   public int hashCode() {
     return new 
HashCodeBuilder().append(protocolVersion).append(haGroupName).append(haGroupState)
-      .hashCode();
+      .append(lastSyncStateTimeInMs).hashCode();
   }
 
   @Override
@@ -222,14 +237,15 @@ public class HAGroupStoreRecord {
       HAGroupStoreRecord otherRecord = (HAGroupStoreRecord) other;
       return new EqualsBuilder().append(protocolVersion, 
otherRecord.protocolVersion)
         .append(haGroupName, otherRecord.haGroupName).append(haGroupState, 
otherRecord.haGroupState)
-        .isEquals();
+        .append(lastSyncStateTimeInMs, 
otherRecord.lastSyncStateTimeInMs).isEquals();
     }
   }
 
   @Override
   public String toString() {
     return "HAGroupStoreRecord{" + "protocolVersion='" + protocolVersion + 
'\'' + ", haGroupName='"
-      + haGroupName + '\'' + ", haGroupState=" + haGroupState + '}';
+      + haGroupName + '\'' + ", haGroupState=" + haGroupState + ", 
lastSyncStateTimeInMs="
+      + lastSyncStateTimeInMs + '}';
   }
 
   public String toPrettyString() {
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java
index e34513b135..9f7a310821 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java
@@ -17,7 +17,7 @@
  */
 package org.apache.phoenix.end2end;
 
-import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -85,7 +85,7 @@ public class 
PhoenixRegionServerEndpointITWithConsistentFailover extends BaseTes
     ServerRpcController controller = new ServerRpcController();
 
     try (PhoenixHAAdmin peerHAAdmin = new PhoenixHAAdmin(
-      CLUSTERS.getHBaseCluster2().getConfiguration(), 
ZK_CONSISTENT_HA_NAMESPACE)) {
+      CLUSTERS.getHBaseCluster2().getConfiguration(), 
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE)) {
       HAGroupStoreRecord peerHAGroupStoreRecord = new HAGroupStoreRecord(
         HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, 
HAGroupState.STANDBY);
       peerHAAdmin.createHAGroupStoreRecordInZooKeeper(peerHAGroupStoreRecord);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
index c24e15b2ab..4dc59e9395 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
@@ -17,7 +17,7 @@
  */
 package org.apache.phoenix.end2end.index;
 
-import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
 import static 
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
 import static org.junit.Assert.assertEquals;
@@ -76,7 +76,7 @@ public class IndexRegionObserverMutationBlockingIT extends 
BaseTest {
 
   @Before
   public void setUp() throws Exception {
-    haAdmin = new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_NAMESPACE);
+    haAdmin = new PhoenixHAAdmin(config, 
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
     Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
     String zkUrl = getLocalZkUrl(config);
     String peerZkUrl = CLUSTERS.getZkUrl2();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStateSubscriptionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStateSubscriptionIT.java
new file mode 100644
index 0000000000..a55302ffa5
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStateSubscriptionIT.java
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static 
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.util.HAGroupStoreTestUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+/**
+ * Integration tests for HA Group State Change subscription functionality. 
Tests the new
+ * subscription system where HAGroupStoreClient directly manages subscriptions 
and
+ * HAGroupStoreManager acts as a passthrough.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class HAGroupStateSubscriptionIT extends BaseTest {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HAGroupStateSubscriptionIT.class);
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private PhoenixHAAdmin haAdmin;
+  private PhoenixHAAdmin peerHaAdmin;
+  private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 2000L;
+  private String zkUrl;
+  private String peerZKUrl;
+  private static final HighAvailabilityTestingUtility.HBaseTestingUtilityPair 
CLUSTERS =
+    new HighAvailabilityTestingUtility.HBaseTestingUtilityPair();
+
+  @BeforeClass
+  public static synchronized void doSetup() throws Exception {
+    Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+    props.put(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "true");
+    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    CLUSTERS.start();
+  }
+
+  @Before
+  public void before() throws Exception {
+    haAdmin = new PhoenixHAAdmin(config, 
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
+    zkUrl = getLocalZkUrl(config);
+    this.peerZKUrl = CLUSTERS.getZkUrl2();
+    peerHaAdmin = new PhoenixHAAdmin(peerZKUrl, config, 
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
+
+    // Clean up existing HAGroupStoreRecords
+    try {
+      List<String> haGroupNames = HAGroupStoreClient.getHAGroupNames(zkUrl);
+      for (String haGroupName : haGroupNames) {
+        haAdmin.getCurator().delete().quietly().forPath(toPath(haGroupName));
+        
peerHaAdmin.getCurator().delete().quietly().forPath(toPath(haGroupName));
+      }
+
+    } catch (Exception e) {
+      // Ignore cleanup errors
+    }
+    // Remove any existing entries in the system table
+    HAGroupStoreTestUtil.deleteAllHAGroupRecordsInSystemTable(zkUrl);
+
+    // Insert a HAGroupStoreRecord into the system table
+    
HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(), 
zkUrl,
+      peerZKUrl, ClusterRoleRecord.ClusterRole.ACTIVE, 
ClusterRoleRecord.ClusterRole.STANDBY, null);
+  }
+
+  // ========== Multi-Cluster & Basic Subscription Tests ==========
+
+  @Test
+  public void testDifferentTargetStatesPerCluster() throws Exception {
+    String haGroupName = testName.getMethodName();
+    HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+    // Track notifications
+    AtomicInteger localNotifications = new AtomicInteger(0);
+    AtomicInteger peerNotifications = new AtomicInteger(0);
+    AtomicReference<ClusterType> lastLocalClusterType = new 
AtomicReference<>();
+    AtomicReference<ClusterType> lastPeerClusterType = new AtomicReference<>();
+
+    // Create listeners for different target states
+    HAGroupStateListener localListener = (groupName, toState, modifiedTime, 
clusterType) -> {
+      if (toState == HAGroupState.STANDBY_TO_ACTIVE && clusterType == 
ClusterType.LOCAL) {
+        localNotifications.incrementAndGet();
+        lastLocalClusterType.set(clusterType);
+        LOGGER.info("Local target state listener called: {} on {}", toState, 
clusterType);
+      }
+    };
+
+    HAGroupStateListener peerListener = (groupName, toState, modifiedTime, 
clusterType) -> {
+      if (toState == HAGroupState.ACTIVE_NOT_IN_SYNC && clusterType == 
ClusterType.PEER) {
+        peerNotifications.incrementAndGet();
+        lastPeerClusterType.set(clusterType);
+        LOGGER.info("Peer target state listener called: {} on {}", toState, 
clusterType);
+      }
+    };
+
+    // Subscribe to different target states on different clusters
+    manager.subscribeToTargetState(haGroupName, 
HAGroupState.STANDBY_TO_ACTIVE, ClusterType.LOCAL,
+      localListener);
+    manager.subscribeToTargetState(haGroupName, 
HAGroupState.ACTIVE_NOT_IN_SYNC, ClusterType.PEER,
+      peerListener);
+
+    // Trigger transition to STANDBY_TO_ACTIVE on LOCAL cluster
+    HAGroupStoreRecord localRecord =
+      new HAGroupStoreRecord("1.0", haGroupName, 
HAGroupState.STANDBY_TO_ACTIVE);
+    haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, 0);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Trigger transition to STANDBY on PEER cluster
+    HAGroupStoreRecord peerRecord =
+      new HAGroupStoreRecord("1.0", haGroupName, 
HAGroupState.ACTIVE_NOT_IN_SYNC);
+    peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Verify no cross-cluster triggering
+    assertEquals("Local cluster should receive its target state notification", 
1,
+      localNotifications.get());
+    assertEquals("Peer cluster should receive its target state notification", 
1,
+      peerNotifications.get());
+    assertEquals("Local notification should have LOCAL cluster type", 
ClusterType.LOCAL,
+      lastLocalClusterType.get());
+    assertEquals("Peer notification should have PEER cluster type", 
ClusterType.PEER,
+      lastPeerClusterType.get());
+
+  }
+
+  @Test
+  public void testUnsubscribeSpecificCluster() throws Exception {
+    String haGroupName = testName.getMethodName();
+    HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+    // Track notifications
+    AtomicInteger totalNotifications = new AtomicInteger(0);
+    AtomicReference<ClusterType> lastClusterType = new AtomicReference<>();
+
+    HAGroupStateListener listener = (groupName, toState, modifiedTime, 
clusterType) -> {
+      if (toState == HAGroupState.STANDBY) {
+        totalNotifications.incrementAndGet();
+        lastClusterType.set(clusterType);
+        LOGGER.info("Listener called: {} on {}", toState, clusterType);
+      }
+    };
+
+    // Subscribe to same target state on both clusters
+    manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, 
ClusterType.LOCAL, listener);
+    manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, 
ClusterType.PEER, listener);
+
+    // Unsubscribe from LOCAL cluster only
+    manager.unsubscribeFromTargetState(haGroupName, HAGroupState.STANDBY, 
ClusterType.LOCAL,
+      listener);
+
+    // Trigger transition to STANDBY on LOCAL → should NOT call listener
+    HAGroupStoreRecord localRecord =
+      new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY);
+    haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, 0);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    assertEquals("Should receive no notifications from LOCAL cluster", 0, 
totalNotifications.get());
+
+    // Trigger transition to STANDBY on PEER → should call listener
+    HAGroupStoreRecord peerRecord =
+      new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY);
+    peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    assertEquals("Should receive notification only from PEER cluster", 1, 
totalNotifications.get());
+    assertEquals("Notification should be from PEER cluster", ClusterType.PEER,
+      lastClusterType.get());
+
+  }
+
+  // ========== Multiple Listeners Tests ==========
+
+  @Test
+  public void testMultipleListenersMultipleClusters() throws Exception {
+    String haGroupName = testName.getMethodName();
+    HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+    // Track notifications from multiple listeners
+    AtomicInteger listener1LocalNotifications = new AtomicInteger(0);
+    AtomicInteger listener2LocalNotifications = new AtomicInteger(0);
+    AtomicInteger listener1PeerNotifications = new AtomicInteger(0);
+    AtomicInteger listener2PeerNotifications = new AtomicInteger(0);
+
+    HAGroupStateListener listener1 = (groupName, toState, modifiedTime, 
clusterType) -> {
+      if (toState == HAGroupState.DEGRADED_STANDBY) {
+        if (clusterType == ClusterType.LOCAL) {
+          listener1LocalNotifications.incrementAndGet();
+        } else {
+          listener1PeerNotifications.incrementAndGet();
+        }
+        LOGGER.info("Listener1 called: {} on {}", toState, clusterType);
+      }
+    };
+
+    HAGroupStateListener listener2 = (groupName, toState, modifiedTime, 
clusterType) -> {
+      if (toState == HAGroupState.DEGRADED_STANDBY) {
+        if (clusterType == ClusterType.LOCAL) {
+          listener2LocalNotifications.incrementAndGet();
+        } else {
+          listener2PeerNotifications.incrementAndGet();
+        }
+        LOGGER.info("Listener2 called: {} on {}", toState, clusterType);
+      }
+    };
+
+    // Register multiple listeners for same target state on both clusters
+    manager.subscribeToTargetState(haGroupName, HAGroupState.DEGRADED_STANDBY, 
ClusterType.LOCAL,
+      listener1);
+    manager.subscribeToTargetState(haGroupName, HAGroupState.DEGRADED_STANDBY, 
ClusterType.LOCAL,
+      listener2);
+    manager.subscribeToTargetState(haGroupName, HAGroupState.DEGRADED_STANDBY, 
ClusterType.PEER,
+      listener1);
+    manager.subscribeToTargetState(haGroupName, HAGroupState.DEGRADED_STANDBY, 
ClusterType.PEER,
+      listener2);
+
+    // Trigger transition to DEGRADED_STANDBY on LOCAL
+    HAGroupStoreRecord localRecord =
+      new HAGroupStoreRecord("1.0", haGroupName, 
HAGroupState.DEGRADED_STANDBY);
+    haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, 0);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Trigger transition to DEGRADED_STANDBY on PEER
+    HAGroupStoreRecord peerRecord =
+      new HAGroupStoreRecord("1.0", haGroupName, 
HAGroupState.DEGRADED_STANDBY);
+    peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Verify all listeners called for each cluster
+    assertEquals("Listener1 should receive LOCAL notification", 1,
+      listener1LocalNotifications.get());
+    assertEquals("Listener2 should receive LOCAL notification", 1,
+      listener2LocalNotifications.get());
+    assertEquals("Listener1 should receive PEER notification", 1, 
listener1PeerNotifications.get());
+    assertEquals("Listener2 should receive PEER notification", 1, 
listener2PeerNotifications.get());
+
+  }
+
+  @Test
+  public void testSameListenerDifferentTargetStates() throws Exception {
+    String haGroupName = testName.getMethodName();
+    HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+    // Track which target states were reached
+    AtomicInteger stateANotifications = new AtomicInteger(0);
+    AtomicInteger stateBNotifications = new AtomicInteger(0);
+    AtomicReference<ClusterType> lastStateAClusterType = new 
AtomicReference<>();
+    AtomicReference<ClusterType> lastStateBClusterType = new 
AtomicReference<>();
+
+    HAGroupStateListener sharedListener = (groupName, toState, modifiedTime, 
clusterType) -> {
+      if (toState == HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY && clusterType == 
ClusterType.LOCAL) {
+        stateANotifications.incrementAndGet();
+        lastStateAClusterType.set(clusterType);
+        LOGGER.info("Shared listener - Target State A: {} on {}", toState, 
clusterType);
+      } else if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType == 
ClusterType.PEER) {
+        stateBNotifications.incrementAndGet();
+        lastStateBClusterType.set(clusterType);
+        LOGGER.info("Shared listener - Target State B: {} on {}", toState, 
clusterType);
+      }
+    };
+
+    // Register same listener for different target states on different clusters
+    manager.subscribeToTargetState(haGroupName, 
HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY,
+      ClusterType.LOCAL, sharedListener);
+    manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, 
ClusterType.PEER,
+      sharedListener);
+
+    // Trigger target state A on LOCAL
+    HAGroupStoreRecord localRecord =
+      new HAGroupStoreRecord("1.0", haGroupName, 
HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY);
+    haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, 0);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Trigger target state B on PEER
+    HAGroupStoreRecord peerRecord =
+      new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_IN_SYNC);
+    peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Verify listener called for each appropriate target state/cluster 
combination
+    assertEquals("Should receive target state A notification", 1, 
stateANotifications.get());
+    assertEquals("Should receive target state B notification", 1, 
stateBNotifications.get());
+    assertEquals("Target state A should be from LOCAL cluster", 
ClusterType.LOCAL,
+      lastStateAClusterType.get());
+    assertEquals("Target state B should be from PEER cluster", 
ClusterType.PEER,
+      lastStateBClusterType.get());
+
+  }
+
+  // ========== Edge Cases & Error Handling ==========
+
+  @Test
+  public void testSubscriptionToNonExistentHAGroup() throws Exception {
+    String nonExistentHAGroup = "nonExistentGroup_" + testName.getMethodName();
+    HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+    HAGroupStateListener listener = (groupName, toState, modifiedTime, 
clusterType) -> {
+      // Should not be called
+    };
+
+    // Try to subscribe to non-existent HA group
+    try {
+      manager.subscribeToTargetState(nonExistentHAGroup, HAGroupState.STANDBY, 
ClusterType.LOCAL,
+        listener);
+      fail("Expected IOException for non-existent HA group");
+    } catch (IOException e) {
+      assertTrue("Exception should mention the HA group name",
+        e.getMessage().contains(nonExistentHAGroup));
+      LOGGER.info("Correctly caught exception for non-existent HA group: {}", 
e.getMessage());
+    }
+  }
+
+  @Test
+  public void testListenerExceptionIsolation() throws Exception {
+    String haGroupName = testName.getMethodName();
+    HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+    // Track notifications
+    AtomicInteger goodListener1Notifications = new AtomicInteger(0);
+    AtomicInteger goodListener2Notifications = new AtomicInteger(0);
+    AtomicInteger badListenerCalls = new AtomicInteger(0);
+
+    HAGroupStateListener goodListener1 = (groupName, toState, modifiedTime, 
clusterType) -> {
+      if (toState == HAGroupState.ACTIVE_IN_SYNC) {
+        goodListener1Notifications.incrementAndGet();
+        LOGGER.info("Good listener 1 called: {} on {}", toState, clusterType);
+      }
+    };
+
+    HAGroupStateListener badListener = (groupName, toState, modifiedTime, 
clusterType) -> {
+      if (toState == HAGroupState.ACTIVE_IN_SYNC) {
+        badListenerCalls.incrementAndGet();
+        LOGGER.info("Bad listener called, about to throw exception");
+        throw new RuntimeException("Test exception from bad listener");
+      }
+    };
+
+    HAGroupStateListener goodListener2 = (groupName, toState, modifiedTime, 
clusterType) -> {
+      if (toState == HAGroupState.ACTIVE_IN_SYNC) {
+        goodListener2Notifications.incrementAndGet();
+        LOGGER.info("Good listener 2 called: {} on {}", toState, clusterType);
+      }
+    };
+
+    // Register listeners - bad listener in the middle
+    manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, 
ClusterType.LOCAL,
+      goodListener1);
+    manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, 
ClusterType.LOCAL,
+      badListener);
+    manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, 
ClusterType.LOCAL,
+      goodListener2);
+
+    // Trigger transition to target state
+    HAGroupStoreRecord transitionRecord =
+      new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_IN_SYNC);
+    haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, transitionRecord, 
0);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Verify all listeners were called despite exception in bad listener
+    assertEquals("Good listener 1 should be called", 1, 
goodListener1Notifications.get());
+    assertEquals("Good listener 2 should be called", 1, 
goodListener2Notifications.get());
+    assertEquals("Bad listener should be called", 1, badListenerCalls.get());
+  }
+
+  // ========== Performance & Concurrency Tests ==========
+
+  @Test
+  public void testConcurrentMultiClusterSubscriptions() throws Exception {
+    String haGroupName = testName.getMethodName();
+    HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+    final int threadCount = 10;
+    final CountDownLatch startLatch = new CountDownLatch(1);
+    final CountDownLatch completionLatch = new CountDownLatch(threadCount);
+    final AtomicInteger successfulSubscriptions = new AtomicInteger(0);
+
+    ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+    // Create concurrent subscription tasks
+    for (int i = 0; i < threadCount; i++) {
+      final int threadIndex = i;
+      executor.submit(() -> {
+        try {
+          startLatch.await(); // Wait for all threads to be ready
+
+          HAGroupStateListener listener = (groupName, toState, modifiedTime, 
clusterType) -> {
+            LOGGER.debug("Thread {} listener called: {} on {}", threadIndex, 
toState, clusterType);
+          };
+
+          // Half subscribe to LOCAL, half to PEER
+          ClusterType clusterType = (threadIndex % 2 == 0) ? ClusterType.LOCAL 
: ClusterType.PEER;
+
+          manager.subscribeToTargetState(haGroupName, 
HAGroupState.ACTIVE_IN_SYNC, clusterType,
+            listener);
+          successfulSubscriptions.incrementAndGet();
+
+          // Also test unsubscribe
+          manager.unsubscribeFromTargetState(haGroupName, 
HAGroupState.ACTIVE_IN_SYNC, clusterType,
+            listener);
+
+        } catch (Exception e) {
+          LOGGER.error("Thread {} failed", threadIndex, e);
+        } finally {
+          completionLatch.countDown();
+        }
+      });
+    }
+
+    // Start all threads simultaneously
+    startLatch.countDown();
+
+    // Wait for completion
+    assertTrue("All threads should complete within timeout",
+      completionLatch.await(30, TimeUnit.SECONDS));
+    assertEquals("All threads should successfully subscribe", threadCount,
+      successfulSubscriptions.get());
+
+    executor.shutdown();
+  }
+
+  @Test
+  public void testHighFrequencyMultiClusterChanges() throws Exception {
+    String haGroupName = testName.getMethodName();
+    HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+    // Track notifications
+    AtomicInteger localNotifications = new AtomicInteger(0);
+    AtomicInteger peerNotifications = new AtomicInteger(0);
+
+    HAGroupStateListener listener = (groupName, toState, modifiedTime, 
clusterType) -> {
+      if (clusterType == ClusterType.LOCAL) {
+        localNotifications.incrementAndGet();
+      } else {
+        peerNotifications.incrementAndGet();
+      }
+      LOGGER.debug("High frequency listener: {} on {}", toState, clusterType);
+    };
+
+    // Subscribe to target state on both clusters
+    manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, 
ClusterType.LOCAL, listener);
+    manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, 
ClusterType.PEER, listener);
+
+    // Rapidly alternate state changes on both clusters
+    final int changeCount = 5;
+    HAGroupStoreRecord initialPeerRecord =
+      new HAGroupStoreRecord("1.0", haGroupName, 
HAGroupState.DEGRADED_STANDBY_FOR_WRITER);
+    peerHaAdmin.createHAGroupStoreRecordInZooKeeper(initialPeerRecord);
+
+    for (int i = 0; i < changeCount; i++) {
+      // Change local cluster
+      HAGroupStoreRecord localRecord = new HAGroupStoreRecord("1.0", 
haGroupName,
+        (i % 2 == 0) ? HAGroupState.STANDBY : 
HAGroupState.DEGRADED_STANDBY_FOR_READER);
+      haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, 
-1);
+
+      // Change peer cluster
+      HAGroupStoreRecord peerRecord = new HAGroupStoreRecord("1.0", 
haGroupName,
+        (i % 2 == 0) ? HAGroupState.STANDBY : 
HAGroupState.DEGRADED_STANDBY_FOR_WRITER);
+      peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, peerRecord, 
-1);
+
+      // Small delay between changes
+      Thread.sleep(500);
+    }
+
+    // Final wait for all events to propagate
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Verify transitions detected on both clusters
+    // Expected: 3 transitions to STANDBY state (i=0,2,4 → STANDBY)
+    assertEquals("Should detect exactly 3 local cluster transitions to 
STANDBY", 3,
+      localNotifications.get());
+    assertEquals("Should detect exactly 3 peer cluster transitions to 
STANDBY", 3,
+      peerNotifications.get());
+
+    LOGGER.info("Detected {} local and {} peer notifications", 
localNotifications.get(),
+      peerNotifications.get());
+
+  }
+
+  // ========== Cleanup & Resource Management Tests ==========
+
+  @Test
+  public void testSubscriptionCleanupPerCluster() throws Exception {
+    String haGroupName = testName.getMethodName();
+    HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+    // Track notifications to verify functionality
+    AtomicInteger localActiveNotifications = new AtomicInteger(0);
+    AtomicInteger peerActiveNotifications = new AtomicInteger(0);
+    AtomicInteger localStandbyNotifications = new AtomicInteger(0);
+    AtomicInteger peerStandbyNotifications = new AtomicInteger(0);
+
+    // Create listeners that track which ones are called
+    HAGroupStateListener listener1 = (groupName, toState, modifiedTime, 
clusterType) -> {
+      if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType == 
ClusterType.LOCAL) {
+        localActiveNotifications.incrementAndGet();
+        LOGGER.info("Listener1 LOCAL ACTIVE_IN_SYNC: {}", toState);
+      }
+    };
+
+    HAGroupStateListener listener2 = (groupName, toState, modifiedTime, 
clusterType) -> {
+      if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType == 
ClusterType.LOCAL) {
+        localActiveNotifications.incrementAndGet();
+        LOGGER.info("Listener2 LOCAL ACTIVE_IN_SYNC: {}", toState);
+      } else if (toState == HAGroupState.STANDBY_TO_ACTIVE && clusterType == 
ClusterType.PEER) {
+        peerActiveNotifications.incrementAndGet();
+        LOGGER.info("Listener2 PEER STANDBY_TO_ACTIVE: {}", toState);
+      }
+    };
+
+    HAGroupStateListener listener3 = (groupName, toState, modifiedTime, 
clusterType) -> {
+      if (toState == HAGroupState.STANDBY_TO_ACTIVE && clusterType == 
ClusterType.PEER) {
+        peerActiveNotifications.incrementAndGet();
+        LOGGER.info("Listener3 PEER STANDBY_TO_ACTIVE: {}", toState);
+      }
+    };
+
+    HAGroupStateListener listener4 = (groupName, toState, modifiedTime, 
clusterType) -> {
+      if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType == 
ClusterType.LOCAL) {
+        localStandbyNotifications.incrementAndGet();
+        LOGGER.info("Listener4 LOCAL ACTIVE_IN_SYNC: {}", toState);
+      } else if (toState == HAGroupState.STANDBY_TO_ACTIVE && clusterType == 
ClusterType.PEER) {
+        peerStandbyNotifications.incrementAndGet();
+        LOGGER.info("Listener4 PEER STANDBY_TO_ACTIVE: {}", toState);
+      }
+    };
+
+    // Subscribe listeners to both clusters for target states
+    manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, 
ClusterType.LOCAL,
+      listener1);
+    manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, 
ClusterType.LOCAL,
+      listener2);
+    manager.subscribeToTargetState(haGroupName, 
HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER,
+      listener2);
+    manager.subscribeToTargetState(haGroupName, 
HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER,
+      listener3);
+    manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, 
ClusterType.LOCAL,
+      listener4);
+    manager.subscribeToTargetState(haGroupName, 
HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER,
+      listener4);
+
+    // Test initial functionality - trigger ACTIVE_IN_SYNC on LOCAL
+    HAGroupStoreRecord localActiveRecord =
+      new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_IN_SYNC);
+    haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
localActiveRecord, 0);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Should have 2 notifications for LOCAL ACTIVE_NOT_IN_SYNC (listener1 + 
listener2)
+    assertEquals("Should have 2 LOCAL ACTIVE_IN_SYNC notifications initially", 
2,
+      localActiveNotifications.get());
+
+    // Test initial functionality - trigger STANDBY_TO_ACTIVE on PEER
+    HAGroupStoreRecord peerActiveRecord =
+      new HAGroupStoreRecord("1.0", haGroupName, 
HAGroupState.STANDBY_TO_ACTIVE);
+    peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerActiveRecord);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Should have 2 notifications for PEER STANDBY_TO_ACTIVE (listener2 + 
listener3)
+    assertEquals("Should have 2 PEER STANDBY_TO_ACTIVE notifications 
initially", 2,
+      peerActiveNotifications.get());
+
+    // Reset counters for cleanup testing
+    localActiveNotifications.set(0);
+    peerActiveNotifications.set(0);
+
+    // Unsubscribe selectively
+    manager.unsubscribeFromTargetState(haGroupName, 
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL,
+      listener1);
+    manager.unsubscribeFromTargetState(haGroupName, 
HAGroupState.STANDBY_TO_ACTIVE,
+      ClusterType.PEER, listener2);
+    manager.unsubscribeFromTargetState(haGroupName, 
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL,
+      listener4);
+
+    // Test after partial unsubscribe - trigger ACTIVE_IN_SYNC on LOCAL again 
by first changing to
+    // some other state.
+    HAGroupStoreRecord localActiveRecord2 =
+      new HAGroupStoreRecord("1.0", haGroupName, 
HAGroupState.ACTIVE_NOT_IN_SYNC);
+    haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
localActiveRecord2, 1);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    HAGroupStoreRecord localActiveRecord3 =
+      new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_IN_SYNC);
+    haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
localActiveRecord3, 2);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Should have only 1 notification for LOCAL ACTIVE_IN_SYNC (only 
listener2 remains)
+    assertEquals("Should have 1 LOCAL ACTIVE_IN_SYNC notification after 
partial unsubscribe", 1,
+      localActiveNotifications.get());
+
+    // Test after partial unsubscribe - trigger STANDBY_TO_ACTIVE on PEER again
+    HAGroupStoreRecord peerActiveRecord2 =
+      new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY);
+    peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
peerActiveRecord2, 0);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    HAGroupStoreRecord peerActiveRecord3 =
+      new HAGroupStoreRecord("1.0", haGroupName, 
HAGroupState.STANDBY_TO_ACTIVE);
+    peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
peerActiveRecord3, 1);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Should have only 1 notification for PEER STANDBY_TO_ACTIVE (only 
listener3 remains)
+    assertEquals("Should have 1 PEER STANDBY_TO_ACTIVE notification after 
partial unsubscribe", 1,
+      peerActiveNotifications.get());
+
+    // Reset counters again
+    localActiveNotifications.set(0);
+    peerActiveNotifications.set(0);
+
+    // Unsubscribe all remaining listeners
+    manager.unsubscribeFromTargetState(haGroupName, 
HAGroupState.ACTIVE_NOT_IN_SYNC,
+      ClusterType.LOCAL, listener2);
+    manager.unsubscribeFromTargetState(haGroupName, 
HAGroupState.ACTIVE_NOT_IN_SYNC,
+      ClusterType.PEER, listener3);
+    manager.unsubscribeFromTargetState(haGroupName, HAGroupState.STANDBY, 
ClusterType.PEER,
+      listener4);
+
+    // Test after complete unsubscribe - trigger ACTIVE_NOT_IN_SYNC on both 
clusters
+    HAGroupStoreRecord localActiveRecord4 =
+      new HAGroupStoreRecord("1.0", haGroupName, 
HAGroupState.ACTIVE_NOT_IN_SYNC);
+    haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
localActiveRecord4, 3);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    HAGroupStoreRecord peerActiveRecord4 =
+      new HAGroupStoreRecord("1.0", haGroupName, 
HAGroupState.ACTIVE_NOT_IN_SYNC);
+    peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
peerActiveRecord4, 2);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Should have no notifications after complete unsubscribe
+    assertEquals("Should have 0 LOCAL ACTIVE_NOT_IN_SYNC notifications after 
complete unsubscribe",
+      0, localActiveNotifications.get());
+    assertEquals("Should have 0 PEER ACTIVE_NOT_IN_SYNC notifications after 
complete unsubscribe",
+      0, peerActiveNotifications.get());
+
+    // Test that new subscriptions still work properly
+    AtomicInteger newSubscriptionNotifications = new AtomicInteger(0);
+    HAGroupStateListener newTestListener = (groupName, toState, modifiedTime, 
clusterType) -> {
+      if (toState == HAGroupState.STANDBY && clusterType == ClusterType.LOCAL) 
{
+        newSubscriptionNotifications.incrementAndGet();
+        LOGGER.info("New subscription triggered: {} on {} at {}", toState, 
clusterType,
+          modifiedTime);
+      }
+    };
+
+    // Subscribe with new test listener
+    manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, 
ClusterType.LOCAL,
+      newTestListener);
+
+    // Trigger STANDBY state and verify new subscription works
+    HAGroupStoreRecord standbyRecord =
+      new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY);
+    haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, standbyRecord, 4);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Expected: exactly 1 notification for the new subscription
+    assertEquals("New subscription should receive exactly 1 notification", 1,
+      newSubscriptionNotifications.get());
+
+    LOGGER.info(
+      "Subscription cleanup test completed successfully with {} notifications 
from new subscription",
+      newSubscriptionNotifications.get());
+
+  }
+
+  /**
+   * Helper method to access private subscription maps via reflection
+   */
+  @SuppressWarnings("unchecked")
+  private ConcurrentHashMap<String, CopyOnWriteArraySet<HAGroupStateListener>>
+    getSubscriptionMap(HAGroupStoreClient client, String fieldName) throws 
Exception {
+    Field field = HAGroupStoreClient.class.getDeclaredField(fieldName);
+    field.setAccessible(true);
+    return (ConcurrentHashMap<String, 
CopyOnWriteArraySet<HAGroupStateListener>>) field.get(client);
+  }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
index f5df507c7b..da3f29d2c6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
@@ -18,7 +18,7 @@
 package org.apache.phoenix.jdbc;
 
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
-import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
 import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_KEY;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
@@ -93,9 +93,9 @@ public class HAGroupStoreClientIT extends BaseTest {
   @Before
   public void before() throws Exception {
     haAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(),
-      ZK_CONSISTENT_HA_NAMESPACE);
+      ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
     peerHaAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(),
-      ZK_CONSISTENT_HA_NAMESPACE);
+      ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
     
haAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
     
peerHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
     zkUrl = getLocalZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration());
@@ -468,6 +468,8 @@ public class HAGroupStoreClientIT extends BaseTest {
     assertNotNull(currentRecord);
     assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
       currentRecord.getHAGroupState());
+    // The record should have a timestamp
+    assertNotNull(currentRecord.getLastSyncStateTimeInMs());
 
     record1 = new HAGroupStoreRecord("v1.0", haGroupName,
       HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC_TO_STANDBY);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
index a4ae0aeb25..16d4545e7b 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
@@ -17,24 +17,34 @@
  */
 package org.apache.phoenix.jdbc;
 
-import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
+import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
+import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_SESSION_TIMEOUT_MULTIPLIER;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
 import static 
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
+import java.lang.reflect.Field;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.exception.InvalidClusterRoleTransitionException;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.util.HAGroupStoreTestUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.zookeeper.data.Stat;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -64,13 +74,14 @@ public class HAGroupStoreManagerIT extends BaseTest {
   public static synchronized void doSetup() throws Exception {
     Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
     props.put(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "true");
+    props.put(ZK_SESSION_TIMEOUT, String.valueOf(30 * 1000));
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     CLUSTERS.start();
   }
 
   @Before
   public void before() throws Exception {
-    haAdmin = new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_NAMESPACE);
+    haAdmin = new PhoenixHAAdmin(config, 
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
     zkUrl = getLocalZkUrl(config);
     this.peerZKUrl = CLUSTERS.getZkUrl2();
 
@@ -167,12 +178,97 @@ public class HAGroupStoreManagerIT extends BaseTest {
       haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
     assertTrue(retrievedOpt.isPresent());
 
-    // Record for comparison
-    HAGroupStoreRecord record = new 
HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
-      haGroupName, HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
+    // Get MTime from HAAdmin for equality verification below.
+    Pair<HAGroupStoreRecord, Stat> currentRecordAndStat =
+      haAdmin.getHAGroupStoreRecordInZooKeeper(haGroupName);
 
-    // Complete object comparison instead of field-by-field
-    assertEquals(record, retrievedOpt.get());
+    // Complete object comparison field-by-field
+    assertEquals(haGroupName, retrievedOpt.get().getHaGroupName());
+    assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
+      retrievedOpt.get().getHAGroupState());
+    Long lastSyncStateTimeInMs = retrievedOpt.get().getLastSyncStateTimeInMs();
+    Long mtime = currentRecordAndStat.getRight().getMtime();
+    // Allow a small margin of error
+    assertTrue(Math.abs(lastSyncStateTimeInMs - mtime) <= 1);
+    assertEquals(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
+      retrievedOpt.get().getProtocolVersion());
+  }
+
+  @Test
+  public void testGetPeerHAGroupStoreRecord() throws Exception {
+    String haGroupName = testName.getMethodName();
+    HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(config);
+
+    // Initially, peer record should not be present
+    Optional<HAGroupStoreRecord> peerRecordOpt =
+      haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+    assertFalse(peerRecordOpt.isPresent());
+
+    // Create a peer HAAdmin to create records in peer cluster
+    PhoenixHAAdmin peerHaAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(),
+      ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
+
+    try {
+      // Create a HAGroupStoreRecord in the peer cluster
+      HAGroupStoreRecord peerRecord =
+        new HAGroupStoreRecord("1.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.STANDBY);
+
+      peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+      Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+      // Now peer record should be present
+      peerRecordOpt = 
haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+      assertTrue(peerRecordOpt.isPresent());
+
+      // Verify the peer record details
+      HAGroupStoreRecord retrievedPeerRecord = peerRecordOpt.get();
+      assertEquals(haGroupName, retrievedPeerRecord.getHaGroupName());
+      assertEquals(HAGroupStoreRecord.HAGroupState.STANDBY, 
retrievedPeerRecord.getHAGroupState());
+      assertEquals(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
+        retrievedPeerRecord.getProtocolVersion());
+
+      // Delete peer record
+      peerHaAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName);
+      Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+      // Peer record should no longer be present
+      peerRecordOpt = 
haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+      assertFalse(peerRecordOpt.isPresent());
+
+      // Create peer record again with different state
+      HAGroupStoreRecord newPeerRecord = new HAGroupStoreRecord("1.0", 
haGroupName,
+        HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER);
+
+      peerHaAdmin.createHAGroupStoreRecordInZooKeeper(newPeerRecord);
+      Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+      // Verify the updated peer record
+      peerRecordOpt = 
haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+      assertTrue(peerRecordOpt.isPresent());
+      assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER,
+        peerRecordOpt.get().getHAGroupState());
+
+    } finally {
+      // Clean up peer record
+      try {
+        peerHaAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName);
+      } catch (Exception e) {
+        // Ignore cleanup errors
+      }
+      peerHaAdmin.close();
+    }
+  }
+
+  @Test
+  public void testGetPeerHAGroupStoreRecordWhenHAGroupNotInSystemTable() 
throws Exception {
+    String haGroupName = testName.getMethodName();
+    HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(config);
+
+    // Try to get peer record for an HA group that doesn't exist in system 
table
+    Optional<HAGroupStoreRecord> peerRecordOpt =
+      haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+    assertFalse("Peer record should not be present for non-existent HA group",
+      peerRecordOpt.isPresent());
   }
 
   @Test
@@ -215,9 +311,13 @@ public class HAGroupStoreManagerIT extends BaseTest {
     conf.set(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "false");
     conf.set(HConstants.ZOOKEEPER_QUORUM, getLocalZkUrl(config));
 
-    HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(config);
+    // Set the HAGroupStoreManager instance to null via reflection to force 
recreation
+    Field field = 
HAGroupStoreManager.class.getDeclaredField("haGroupStoreManagerInstance");
+    field.setAccessible(true);
+    field.set(null, null);
 
-    // Create HAGroupStoreRecord with ACTIVE_TO_STANDBY role
+    HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(config);
+    // Create HAGroupStoreRecord with ACTIVE_IN_SYNC_TO_STANDBY role
     HAGroupStoreRecord transitionRecord = new HAGroupStoreRecord("1.0", 
haGroupName,
       HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY);
 
@@ -226,6 +326,10 @@ public class HAGroupStoreManagerIT extends BaseTest {
 
     // Mutations should not be blocked even with ACTIVE_TO_STANDBY role
     assertFalse(haGroupStoreManager.isMutationBlocked(haGroupName));
+
+    // Set the HAGroupStoreManager instance back to null via reflection to 
force
+    // recreation for other tests
+    field.set(null, null);
   }
 
   @Test
@@ -251,30 +355,49 @@ public class HAGroupStoreManagerIT extends BaseTest {
     HAGroupStoreRecord updatedRecord = updatedRecordOpt.get();
     assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
       updatedRecord.getHAGroupState());
+    assertNotNull(updatedRecord.getLastSyncStateTimeInMs());
+
+    // Set the HA group status to store and forward again and verify
+    // that getLastSyncStateTimeInMs is same (ACTIVE_NOT_IN_SYNC)
+    // The time should only update when we move to AIS to ANIS
+    haGroupStoreManager.setHAGroupStatusToStoreAndForward(haGroupName);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+    Optional<HAGroupStoreRecord> updatedRecordOpt2 =
+      haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+    assertTrue(updatedRecordOpt2.isPresent());
+    HAGroupStoreRecord updatedRecord2 = updatedRecordOpt.get();
+    assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
+      updatedRecord2.getHAGroupState());
+    assertEquals(updatedRecord.getLastSyncStateTimeInMs(),
+      updatedRecord2.getLastSyncStateTimeInMs());
   }
 
   @Test
-  public void testSetHAGroupStatusRecordToSync() throws Exception {
+  public void testSetHAGroupStatusToSync() throws Exception {
     String haGroupName = testName.getMethodName();
     HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(config);
 
-    // Create an initial HAGroupStoreRecord with ACTIVE_NOT_IN_SYNC status
-    HAGroupStoreRecord initialRecord = new HAGroupStoreRecord("1.0", 
haGroupName,
-      HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
-
-    haAdmin.createHAGroupStoreRecordInZooKeeper(initialRecord);
-    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+    // Initial record should be present in ACTIVE_NOT_IN_SYNC status
+    HAGroupStoreRecord initialRecord =
+      haGroupStoreManager.getHAGroupStoreRecord(haGroupName).orElse(null);
+    assertNotNull(initialRecord);
+    assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
+      initialRecord.getHAGroupState());
+    assertNotNull(initialRecord.getLastSyncStateTimeInMs());
 
-    // Set the HA group status to sync (ACTIVE)
-    haGroupStoreManager.setHAGroupStatusRecordToSync(haGroupName);
+    // Set the HA group status to sync (ACTIVE), we need to wait for 
ZK_SESSION_TIMEOUT * Multiplier
+    Thread.sleep((long) Math.ceil(config.getLong(ZK_SESSION_TIMEOUT, 
DEFAULT_ZK_SESSION_TIMEOUT)
+      * ZK_SESSION_TIMEOUT_MULTIPLIER));
+    haGroupStoreManager.setHAGroupStatusToSync(haGroupName);
     Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
 
-    // Verify the status was updated to ACTIVE
+    // Verify the state was updated to ACTIVE_IN_SYNC
     Optional<HAGroupStoreRecord> updatedRecordOpt =
       haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
     assertTrue(updatedRecordOpt.isPresent());
     HAGroupStoreRecord updatedRecord = updatedRecordOpt.get();
-    assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE, 
updatedRecord.getClusterRole());
+    assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, 
updatedRecord.getHAGroupState());
+    assertNull(updatedRecord.getLastSyncStateTimeInMs());
   }
 
   @Test
@@ -356,4 +479,143 @@ public class HAGroupStoreManagerIT extends BaseTest {
 
   }
 
+  @Test
+  public void testSetReaderToDegraded() throws Exception {
+    String haGroupName = testName.getMethodName();
+    HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(config);
+
+    // Update the auto-created record to STANDBY state for testing
+    HAGroupStoreRecord standbyRecord =
+      new HAGroupStoreRecord("1.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.STANDBY);
+
+    // Get the record to initialize ZNode from HAGroup so that we can 
artificially update
+    // it via HAAdmin
+    Optional<HAGroupStoreRecord> currentRecord =
+      haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+    assertTrue(currentRecord.isPresent());
+
+    // Update via HAAdmin
+    haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, standbyRecord, 0);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Call setReaderToDegraded
+    haGroupStoreManager.setReaderToDegraded(haGroupName);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Verify the status was updated to DEGRADED_STANDBY_FOR_READER
+    Optional<HAGroupStoreRecord> updatedRecordOpt =
+      haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+    assertTrue(updatedRecordOpt.isPresent());
+    HAGroupStoreRecord updatedRecord = updatedRecordOpt.get();
+    assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER,
+      updatedRecord.getHAGroupState());
+
+    // Test transition from DEGRADED_STANDBY_FOR_WRITER to DEGRADED_STANDBY
+    HAGroupStoreRecord degradedWriterRecord = new HAGroupStoreRecord("1.0", 
haGroupName,
+      HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER);
+
+    haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
degradedWriterRecord, 2);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Call setReaderToDegraded again
+    haGroupStoreManager.setReaderToDegraded(haGroupName);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Verify the status was updated to DEGRADED_STANDBY
+    updatedRecordOpt = haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+    assertTrue(updatedRecordOpt.isPresent());
+    updatedRecord = updatedRecordOpt.get();
+    assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, 
updatedRecord.getHAGroupState());
+  }
+
+  @Test
+  public void testSetReaderToHealthy() throws Exception {
+    String haGroupName = testName.getMethodName();
+    HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(config);
+
+    // Get the record to initialize ZNode from HAGroup so that we can 
artificially
+    // update it via HAAdmin
+    Optional<HAGroupStoreRecord> currentRecord =
+      haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+    assertTrue(currentRecord.isPresent());
+
+    // Update the auto-created record to DEGRADED_STANDBY_FOR_READER state for 
testing
+    HAGroupStoreRecord degradedReaderRecord = new HAGroupStoreRecord("1.0", 
haGroupName,
+      HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER);
+
+    haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
degradedReaderRecord, 0);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Call setReaderToHealthy
+    haGroupStoreManager.setReaderToHealthy(haGroupName);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Verify the status was updated to STANDBY
+    Optional<HAGroupStoreRecord> updatedRecordOpt =
+      haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+    assertTrue(updatedRecordOpt.isPresent());
+    HAGroupStoreRecord updatedRecord = updatedRecordOpt.get();
+    assertEquals(HAGroupStoreRecord.HAGroupState.STANDBY, 
updatedRecord.getHAGroupState());
+
+    // Test transition from DEGRADED_STANDBY to DEGRADED_STANDBY_FOR_WRITER
+    HAGroupStoreRecord degradedRecord =
+      new HAGroupStoreRecord("1.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY);
+
+    haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, degradedRecord, 
2);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Call setReaderToHealthy again
+    haGroupStoreManager.setReaderToHealthy(haGroupName);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Verify the status was updated to DEGRADED_STANDBY_FOR_WRITER
+    updatedRecordOpt = haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+    assertTrue(updatedRecordOpt.isPresent());
+    updatedRecord = updatedRecordOpt.get();
+    assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER,
+      updatedRecord.getHAGroupState());
+  }
+
+  @Test
+  public void testReaderStateTransitionInvalidStates() throws Exception {
+    String haGroupName = testName.getMethodName();
+    HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(config);
+
+    // Get the record to initialize ZNode from HAGroup so that we can 
artificially
+    // update it via HAAdmin
+    Optional<HAGroupStoreRecord> currentRecord =
+      haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+    assertTrue(currentRecord.isPresent());
+
+    // Update the auto-created record to ACTIVE_IN_SYNC state (invalid for 
both operations)
+    HAGroupStoreRecord activeRecord =
+      new HAGroupStoreRecord("1.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
+
+    haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, activeRecord, 0);
+    Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+    // Test setReaderToDegraded with invalid state
+    try {
+      haGroupStoreManager.setReaderToDegraded(haGroupName);
+      fail("Expected InvalidClusterRoleTransitionException for 
setReaderToDegraded "
+        + "with ACTIVE_IN_SYNC state");
+    } catch (InvalidClusterRoleTransitionException e) {
+      // Expected behavior
+      assertTrue("Exception should mention the invalid transition",
+        e.getMessage().contains("ACTIVE_IN_SYNC")
+          && e.getMessage().contains("DEGRADED_STANDBY_FOR_READER"));
+    }
+
+    // Test setReaderToHealthy with invalid state
+    try {
+      haGroupStoreManager.setReaderToHealthy(haGroupName);
+      fail(
+        "Expected InvalidClusterRoleTransitionException for setReaderToHealthy 
with ACTIVE_IN_SYNC state");
+    } catch (InvalidClusterRoleTransitionException e) {
+      // Expected behavior
+      assertTrue("Exception should mention the invalid transition",
+        e.getMessage().contains("ACTIVE_IN_SYNC") && 
e.getMessage().contains("STANDBY"));
+    }
+  }
+
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java
index 79ae44203f..14391e6d62 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java
@@ -17,7 +17,7 @@
  */
 package org.apache.phoenix.jdbc;
 
-import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -73,9 +73,9 @@ public class PhoenixHAAdminIT extends BaseTest {
   @Before
   public void before() throws Exception {
     haAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(),
-      ZK_CONSISTENT_HA_NAMESPACE);
+      ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
     peerHaAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(),
-      ZK_CONSISTENT_HA_NAMESPACE);
+      ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
     cleanupTestZnodes();
   }
 
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java
index b25c6c77a0..0a042eff93 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java
@@ -114,4 +114,20 @@ public class HAGroupStoreTestUtil {
       conn.commit();
     }
   }
+
+  /**
+   * Deletes all HA group records from the system table for testing purposes.
+   * @param zkUrl the ZooKeeper URL to connect to
+   * @throws SQLException if the database operation fails
+   */
+  public static void deleteAllHAGroupRecordsInSystemTable(String zkUrl) throws 
SQLException {
+    // Delete all records from System Table
+    try (
+      PhoenixConnection conn = (PhoenixConnection) DriverManager
+        .getConnection(JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + zkUrl);
+      Statement stmt = conn.createStatement()) {
+      stmt.execute("DELETE FROM " + SYSTEM_HA_GROUP_NAME);
+      conn.commit();
+    }
+  }
 }

Reply via email to