This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this 
push:
     new 5ffd6490ed PHOENIX-7566 HAGroupState subscription feature and state 
management for ReplicationLogReader (#2274)
5ffd6490ed is described below

commit 5ffd6490ede889ab65f8a2de5a8b27cdb968150e
Author: ritegarg <58840065+riteg...@users.noreply.github.com>
AuthorDate: Tue Sep 16 22:44:00 2025 -0700

    PHOENIX-7566 HAGroupState subscription feature and state management for 
ReplicationLogReader (#2274)
    
    Co-authored-by: Ritesh Garg 
<ritesh.g...@riteshg-ltmd34g.internal.salesforce.com>
---
 .../java/org/apache/phoenix/jdbc/ClusterType.java  |  35 ++
 .../apache/phoenix/jdbc/HAGroupStateListener.java  |  57 ++
 .../apache/phoenix/jdbc/HAGroupStoreClient.java    | 250 ++++++--
 .../apache/phoenix/jdbc/HAGroupStoreManager.java   | 211 +++++--
 .../apache/phoenix/jdbc/HAGroupStoreRecord.java    |  33 +-
 ...gionServerEndpointITWithConsistentFailover.java |   4 +-
 .../IndexRegionObserverMutationBlockingIT.java     |   4 +-
 .../phoenix/jdbc/HAGroupStateSubscriptionIT.java   | 644 +++++++++++++++++++++
 .../apache/phoenix/jdbc/HAGroupStoreClientIT.java  |   9 +-
 .../apache/phoenix/jdbc/HAGroupStoreManagerIT.java | 277 ++++++++-
 .../org/apache/phoenix/jdbc/PhoenixHAAdminIT.java  |   6 +-
 .../apache/phoenix/util/HAGroupStoreTestUtil.java  |  17 +-
 12 files changed, 1423 insertions(+), 124 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterType.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterType.java
new file mode 100644
index 0000000000..e194c23e35
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+/**
+ * Enumeration representing the type of cluster in an HA group configuration.
+ * Used to distinguish between local and peer clusters when subscribing to
+ * HA group state change notifications.
+ */
+public enum ClusterType {
+    /**
+     * Represents the local cluster where the client is running.
+     */
+    LOCAL,
+
+    /**
+     * Represents the peer cluster in the HA group configuration.
+     */
+    PEER
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
new file mode 100644
index 0000000000..634f50e0d0
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
+
+/**
+ * Interface for external clients who want to be notified of HA group state 
transitions.
+ *
+ * <p>Listeners can subscribe to be notified when:</p>
+ * <ul>
+ *   <li>Specific state transitions occur (from one state to another)</li>
+ *   <li>Any transition to a target state occurs (from any state to a specific 
state)</li>
+ * </ul>
+ *
+ * <p>Notifications are provided for both local and peer cluster state changes,
+ * distinguished by the {@link ClusterType} parameter.</p>
+ *
+ * @see HAGroupStoreManager#subscribeToTargetState
+ */
+public interface HAGroupStateListener {
+
+    /**
+     * Called when an HA group state transition occurs.
+     *
+     * <p>Implementations should be fast and non-blocking to avoid impacting
+     * the HA group state management system. If heavy processing is required,
+     * consider delegating to a separate thread.</p>
+     *
+     * @param haGroupName the name of the HA group that transitioned
+     * @param toState the new state after the transition
+     * @param modifiedTime the time the state transition occurred
+     * @param clusterType whether this transition occurred on the local or 
peer cluster
+     *
+     * @throws Exception implementations may throw exceptions, but they will be
+     *                   logged and will not prevent other listeners from 
being notified
+     */
+    void onStateChange(String haGroupName,
+                       HAGroupState toState,
+                       long modifiedTime,
+                       ClusterType clusterType);
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
index 123dc28d5c..e4743bc7fd 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -25,10 +25,13 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -76,14 +79,14 @@ import static 
org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK;
  */
 public class HAGroupStoreClient implements Closeable {
 
-    public static final String ZK_CONSISTENT_HA_NAMESPACE =
-            "phoenix" + ZKPaths.PATH_SEPARATOR + "consistentHA";
+    public static final String ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE = 
"phoenix"
+                    + ZKPaths.PATH_SEPARATOR + "consistentHA"
+                    + ZKPaths.PATH_SEPARATOR + "groupState";
     private static final long HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS 
= 30000L;
     // Multiplier for ZK session timeout to account for time it will take for 
HMaster to abort
     // the region server in case ZK connection is lost from the region server.
-    private static final double ZK_SESSION_TIMEOUT_MULTIPLIER = 1.1;
-    private static final String CACHE_TYPE_LOCAL = "LOCAL";
-    private static final String CACHE_TYPE_PEER = "PEER";
+    @VisibleForTesting
+    static final double ZK_SESSION_TIMEOUT_MULTIPLIER = 1.1;
     private PhoenixHAAdmin phoenixHaAdmin;
     private PhoenixHAAdmin peerPhoenixHaAdmin;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HAGroupStoreClient.class);
@@ -108,6 +111,14 @@ public class HAGroupStoreClient implements Closeable {
     private final PathChildrenCacheListener 
peerCustomPathChildrenCacheListener;
     // Wait time for sync mode
     private final long waitTimeForSyncModeInMs;
+    // State tracking for transition detection
+    private volatile HAGroupState lastKnownLocalState;
+    private volatile HAGroupState lastKnownPeerState;
+
+    // Subscription storage for HA group state change notifications per client 
instance
+    // Map key format: "clusterType:targetState" -> Set<Listeners>
+    private final ConcurrentHashMap<String, 
CopyOnWriteArraySet<HAGroupStateListener>>
+            targetStateSubscribers = new ConcurrentHashMap<>();
     // Policy for the HA group
     private HighAvailabilityPolicy policy;
     private ClusterRole clusterRole;
@@ -131,7 +142,7 @@ public class HAGroupStoreClient implements Closeable {
      * @return HAGroupStoreClient instance
      */
     public static HAGroupStoreClient getInstanceForZkUrl(Configuration conf, 
String haGroupName,
-            String zkUrl) throws SQLException {
+            String zkUrl) {
         Preconditions.checkNotNull(haGroupName, "haGroupName cannot be null");
         String localZkUrl = Objects.toString(zkUrl, getLocalZkUrl(conf));
         Preconditions.checkNotNull(localZkUrl, "zkUrl cannot be null");
@@ -159,7 +170,7 @@ public class HAGroupStoreClient implements Closeable {
     /**
      * Get the list of HAGroupNames from system table.
      * We can also get the list of HAGroupNames from the system table by 
providing the zkUrl in
-     * where clause but we need to match the formatted zkUrl with the zkUrl in 
the system table so
+     * where clause, but we need to match the formatted zkUrl with the zkUrl 
in the system table so
      * that matching is done correctly.
      *
      * @param zkUrl for connecting to Table
@@ -206,10 +217,11 @@ public class HAGroupStoreClient implements Closeable {
             // Initialize HAGroupStoreClient attributes
             initializeHAGroupStoreClientAttributes(haGroupName);
             // Initialize Phoenix HA Admin
-            this.phoenixHaAdmin = new PhoenixHAAdmin(this.zkUrl, conf, 
ZK_CONSISTENT_HA_NAMESPACE);
+            this.phoenixHaAdmin = new PhoenixHAAdmin(this.zkUrl,
+                    conf, ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
             // Initialize local cache
             this.pathChildrenCache = 
initializePathChildrenCache(phoenixHaAdmin,
-                    pathChildrenCacheListener, CACHE_TYPE_LOCAL);
+                    pathChildrenCacheListener, ClusterType.LOCAL);
             // Initialize ZNode if not present in ZK
             initializeZNodeIfNeeded();
             if (this.pathChildrenCache != null) {
@@ -263,7 +275,7 @@ public class HAGroupStoreClient implements Closeable {
         if (!isHealthy) {
             throw new IOException("HAGroupStoreClient is not healthy");
         }
-        return fetchCacheRecord(this.pathChildrenCache, 
CACHE_TYPE_LOCAL).getLeft();
+        return fetchCacheRecord(this.pathChildrenCache, 
ClusterType.LOCAL).getLeft();
     }
 
     /**
@@ -283,7 +295,7 @@ public class HAGroupStoreClient implements Closeable {
             throw new IOException("HAGroupStoreClient is not healthy");
         }
         Pair<HAGroupStoreRecord, Stat> cacheRecord = fetchCacheRecord(
-                this.pathChildrenCache, CACHE_TYPE_LOCAL);
+                this.pathChildrenCache, ClusterType.LOCAL);
         HAGroupStoreRecord currentHAGroupStoreRecord = cacheRecord.getLeft();
         Stat currentHAGroupStoreRecordStat = cacheRecord.getRight();
         if (currentHAGroupStoreRecord == null) {
@@ -293,10 +305,29 @@ public class HAGroupStoreClient implements Closeable {
         }
         if (isUpdateNeeded(currentHAGroupStoreRecord.getHAGroupState(),
                 currentHAGroupStoreRecordStat.getMtime(), haGroupState)) {
+                // We maintain last sync time as the last time cluster was in 
sync state.
+                // If state changes from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, 
record that time
+                // Once state changes back to ACTIVE_IN_SYNC or the role is
+                // NOT ACTIVE or ACTIVE_TO_STANDBY
+                // set the time to null to mark that we are current(or we 
don't have any reader).
+                // TODO: Verify that for reader this is the correct approach.
+                Long lastSyncTimeInMs = currentHAGroupStoreRecord
+                        .getLastSyncStateTimeInMs();
+                ClusterRole clusterRole = haGroupState.getClusterRole();
+                if (currentHAGroupStoreRecord.getHAGroupState()
+                        == HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC
+                        && haGroupState == 
HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC) {
+                    lastSyncTimeInMs = System.currentTimeMillis();
+                } else if (haGroupState == HAGroupState.ACTIVE_IN_SYNC
+                        || !(ClusterRole.ACTIVE.equals(clusterRole)
+                            || 
ClusterRole.ACTIVE_TO_STANDBY.equals(clusterRole))) {
+                    lastSyncTimeInMs = null;
+                }
                 HAGroupStoreRecord newHAGroupStoreRecord = new 
HAGroupStoreRecord(
                         currentHAGroupStoreRecord.getProtocolVersion(),
                         currentHAGroupStoreRecord.getHaGroupName(),
-                        haGroupState
+                        haGroupState,
+                        lastSyncTimeInMs
                 );
                 // TODO: Check if cluster role is changing, if so, we need to 
update
                 // the system table first
@@ -339,11 +370,11 @@ public class HAGroupStoreClient implements Closeable {
      * @return HAGroupStoreRecord for the specified HA group name, or null if 
not found
      * @throws IOException if the client is not healthy
      */
-    private HAGroupStoreRecord getHAGroupStoreRecordFromPeer() throws 
IOException {
+    public HAGroupStoreRecord getHAGroupStoreRecordFromPeer() throws 
IOException {
         if (!isHealthy) {
             throw new IOException("HAGroupStoreClient is not healthy");
         }
-        return fetchCacheRecord(this.peerPathChildrenCache, 
CACHE_TYPE_PEER).getLeft();
+        return fetchCacheRecord(this.peerPathChildrenCache, 
ClusterType.PEER).getLeft();
     }
 
     private void initializeZNodeIfNeeded() throws IOException,
@@ -353,13 +384,21 @@ public class HAGroupStoreClient implements Closeable {
         Pair<HAGroupStoreRecord, Stat> cacheRecordFromZK =
                 
phoenixHaAdmin.getHAGroupStoreRecordInZooKeeper(this.haGroupName);
         HAGroupStoreRecord haGroupStoreRecord = cacheRecordFromZK.getLeft();
+        HAGroupState defaultHAGroupState = 
this.clusterRole.getDefaultHAGroupState();
+        // Initialize lastSyncTimeInMs only if we start in ACTIVE_NOT_IN_SYNC 
state
+        //  and ZNode is not already present
+        Long lastSyncTimeInMs = 
defaultHAGroupState.equals(HAGroupState.ACTIVE_NOT_IN_SYNC)
+                                    ? System.currentTimeMillis()
+                                    : null;
         HAGroupStoreRecord newHAGroupStoreRecord = new HAGroupStoreRecord(
             HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
             haGroupName,
-            this.clusterRole.getDefaultHAGroupState()
+            this.clusterRole.getDefaultHAGroupState(),
+            lastSyncTimeInMs
         );
         // Only update current ZNode if it doesn't have the same role as 
present in System Table.
         // If not exists, then create ZNode
+        // TODO: Discuss if this approach is what reader needs.
         if (haGroupStoreRecord != null &&
                 !haGroupStoreRecord.getClusterRole().equals(this.clusterRole)) 
{
             phoenixHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
@@ -431,16 +470,17 @@ public class HAGroupStoreClient implements Closeable {
             try {
                 // Setup peer connection if needed (first time or ZK Url 
changed)
                 if (peerPathChildrenCache == null
-                    || (peerPhoenixHaAdmin != null &&
-                            !StringUtils.equals(this.peerZKUrl, 
peerPhoenixHaAdmin.getZkUrl()))) {
+                    || peerPhoenixHaAdmin != null
+                        && !StringUtils.equals(this.peerZKUrl, 
peerPhoenixHaAdmin.getZkUrl())) {
                     // Clean up existing peer connection if it exists
                     closePeerConnection();
                     // Setup new peer connection
                     this.peerPhoenixHaAdmin
-                            = new PhoenixHAAdmin(this.peerZKUrl, conf, 
ZK_CONSISTENT_HA_NAMESPACE);
+                            = new PhoenixHAAdmin(this.peerZKUrl, conf,
+                            ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
                     // Create new PeerPathChildrenCache
                     this.peerPathChildrenCache = 
initializePathChildrenCache(peerPhoenixHaAdmin,
-                            this.peerCustomPathChildrenCacheListener, 
CACHE_TYPE_PEER);
+                            this.peerCustomPathChildrenCacheListener, 
ClusterType.PEER);
                 }
             } catch (Exception e) {
                 closePeerConnection();
@@ -459,7 +499,8 @@ public class HAGroupStoreClient implements Closeable {
     }
 
     private PathChildrenCache initializePathChildrenCache(PhoenixHAAdmin admin,
-            PathChildrenCacheListener customListener, String cacheType) {
+                                                          
PathChildrenCacheListener customListener,
+                                                          ClusterType 
cacheType) {
         LOGGER.info("Initializing {} PathChildrenCache with URL {}", 
cacheType, admin.getZkUrl());
         PathChildrenCache newPathChildrenCache = null;
         try {
@@ -487,30 +528,38 @@ public class HAGroupStoreClient implements Closeable {
         }
     }
 
-    private PathChildrenCacheListener createCacheListener(CountDownLatch 
latch, String cacheType) {
+    private PathChildrenCacheListener createCacheListener(CountDownLatch latch,
+                                                          ClusterType 
cacheType) {
         return (client, event) -> {
             final ChildData childData = event.getData();
-            HAGroupStoreRecord eventRecord = 
extractHAGroupStoreRecordOrNull(childData);
+            Pair<HAGroupStoreRecord, Stat> eventRecordAndStat
+                    = extractHAGroupStoreRecordOrNull(childData);
+            HAGroupStoreRecord eventRecord = eventRecordAndStat.getLeft();
+            Stat eventStat = eventRecordAndStat.getRight();
             LOGGER.info("HAGroupStoreClient Cache {} received event {} type {} 
at {}",
                     cacheType, eventRecord, event.getType(), 
System.currentTimeMillis());
             switch (event.getType()) {
-                // TODO: Add support for event watcher for HAGroupStoreRecord
-                // case CHILD_ADDED:
-                // case CHILD_UPDATED:
-                // case CHILD_REMOVED:
+                case CHILD_ADDED:
+                case CHILD_UPDATED:
+                    if (eventRecord != null) {
+                        handleStateChange(eventRecord, eventStat, cacheType);
+                    }
+                    break;
+                case CHILD_REMOVED:
+                    break;
                 case INITIALIZED:
                     latch.countDown();
                     break;
                 case CONNECTION_LOST:
                 case CONNECTION_SUSPENDED:
-                    if (CACHE_TYPE_LOCAL.equals(cacheType)) {
+                    if (ClusterType.LOCAL.equals(cacheType)) {
                         isHealthy = false;
                     }
                     LOGGER.warn("{} HAGroupStoreClient cache connection 
lost/suspended",
                             cacheType);
                     break;
                 case CONNECTION_RECONNECTED:
-                    if (CACHE_TYPE_LOCAL.equals(cacheType)) {
+                    if (ClusterType.LOCAL.equals(cacheType)) {
                         isHealthy = true;
                     }
                     LOGGER.info("{} HAGroupStoreClient cache connection 
reconnected", cacheType);
@@ -524,7 +573,7 @@ public class HAGroupStoreClient implements Closeable {
 
 
     private Pair<HAGroupStoreRecord, Stat> fetchCacheRecord(PathChildrenCache 
cache,
-            String cacheType) {
+                                                            ClusterType 
cacheType) {
         if (cache == null) {
             LOGGER.warn("{} HAGroupStoreClient cache is null, returning null", 
cacheType);
             return Pair.of(null, null);
@@ -537,7 +586,7 @@ public class HAGroupStoreClient implements Closeable {
             return result;
         }
 
-        if (cacheType.equals(CACHE_TYPE_PEER)) {
+        if (cacheType.equals(ClusterType.PEER)) {
             return Pair.of(null, null);
         }
         // If no record found, try to rebuild and fetch again
@@ -555,23 +604,25 @@ public class HAGroupStoreClient implements Closeable {
     }
 
     private Pair<HAGroupStoreRecord, Stat> 
extractRecordAndStat(PathChildrenCache cache,
-            String targetPath, String cacheType) {
+                                                                String 
targetPath,
+                                                                ClusterType 
cacheType) {
         ChildData childData = cache.getCurrentData(targetPath);
         if (childData != null) {
-            HAGroupStoreRecord record = 
extractHAGroupStoreRecordOrNull(childData);
-            Stat currentStat = childData.getStat();
-            LOGGER.info("Built {} cluster record: {}", cacheType, record);
-            return Pair.of(record, currentStat);
+            Pair<HAGroupStoreRecord, Stat> recordAndStat
+                    = extractHAGroupStoreRecordOrNull(childData);
+            LOGGER.info("Built {} cluster record: {}", cacheType, 
recordAndStat.getLeft());
+            return recordAndStat;
         }
         return Pair.of(null, null);
     }
 
-    private HAGroupStoreRecord extractHAGroupStoreRecordOrNull(final ChildData 
childData) {
+    private Pair<HAGroupStoreRecord, Stat> extractHAGroupStoreRecordOrNull(
+            final ChildData childData) {
         if (childData != null) {
             byte[] data = childData.getData();
-            return HAGroupStoreRecord.fromJson(data).orElse(null);
+            return Pair.of(HAGroupStoreRecord.fromJson(data).orElse(null), 
childData.getStat());
         }
-        return null;
+        return Pair.of(null, null);
     }
 
 
@@ -638,4 +689,127 @@ public class HAGroupStoreClient implements Closeable {
         return ((System.currentTimeMillis() - currentHAGroupStoreRecordMtime) 
> waitTime);
     }
 
+    // ========== HA Group State Change Subscription Methods ==========
+
+    /**
+     * Subscribe to be notified when any transition to a target state occurs.
+     *
+     * @param targetState the target state to watch for
+     * @param clusterType whether to monitor local or peer cluster
+     * @param listener the listener to notify when any transition to the 
target state occurs
+     */
+    public void subscribeToTargetState(HAGroupState targetState,
+                                       ClusterType clusterType, 
HAGroupStateListener listener) {
+        String key = buildTargetStateKey(clusterType, targetState);
+        targetStateSubscribers.computeIfAbsent(key, k -> new 
CopyOnWriteArraySet<>()).add(listener);
+        LOGGER.info("Subscribed listener to target state {} for HA group {} on 
{} cluster",
+                targetState, haGroupName, clusterType);
+    }
+
+    /**
+     * Unsubscribe from target state notifications.
+     *
+     * @param targetState the target state
+     * @param clusterType whether monitoring local or peer cluster
+     * @param listener the listener to remove
+     */
+    public void unsubscribeFromTargetState(HAGroupState targetState,
+                                           ClusterType clusterType, 
HAGroupStateListener listener) {
+        String key = buildTargetStateKey(clusterType, targetState);
+        CopyOnWriteArraySet<HAGroupStateListener> listeners = 
targetStateSubscribers.get(key);
+        if (listeners != null && listeners.remove(listener)) {
+            if (listeners.isEmpty()) {
+                targetStateSubscribers.remove(key);
+            }
+            LOGGER.info("Unsubscribed listener from target state {} for HA 
group {} on {} cluster",
+                    targetState, haGroupName, clusterType);
+        }
+    }
+
+    /**
+     * Handle state change detection and notify subscribers if a transition 
occurred.
+     *
+     * @param newRecord the new HA group store record
+     * @param cacheType the type of cache (LOCAL or PEER)
+     */
+    private void handleStateChange(HAGroupStoreRecord newRecord,
+                                   Stat newStat, ClusterType cacheType) {
+        HAGroupState newState = newRecord.getHAGroupState();
+        HAGroupState oldState;
+        ClusterType clusterType;
+
+        if (ClusterType.LOCAL.equals(cacheType)) {
+            oldState = lastKnownLocalState;
+            lastKnownLocalState = newState;
+            clusterType = ClusterType.LOCAL;
+        } else {
+            oldState = lastKnownPeerState;
+            lastKnownPeerState = newState;
+            clusterType = ClusterType.PEER;
+        }
+
+        // Only notify if there's an actual state transition or initial state
+        if (oldState == null || !oldState.equals(newState)) {
+            LOGGER.info("Detected state transition for HA group {} from {} to 
{} on {} cluster",
+                    haGroupName, oldState, newState, clusterType);
+            notifySubscribers(oldState, newState, newStat.getMtime(), 
clusterType);
+        }
+    }
+
+    /**
+     * Notify all relevant subscribers of a state transition.
+     *
+     * @param fromState the state transitioned from
+     * @param toState the state transitioned to
+     * @param clusterType the cluster type where the transition occurred
+     */
+    private void notifySubscribers(HAGroupState fromState,
+                                   HAGroupState toState,
+                                   long modifiedTime,
+                                   ClusterType clusterType) {
+        LOGGER.debug("Notifying subscribers of state transition "
+                        + "for HA group {} from {} to {} on {} cluster",
+                haGroupName, fromState, toState, clusterType);
+        String targetStateKey = buildTargetStateKey(clusterType, toState);
+
+        // Collect all listeners that need to be notified
+        Set<HAGroupStateListener> listenersToNotify = new HashSet<>();
+
+        // Find target state subscribers
+        CopyOnWriteArraySet<HAGroupStateListener> targetListeners
+                = targetStateSubscribers.get(targetStateKey);
+        if (targetListeners != null) {
+            listenersToNotify.addAll(targetListeners);
+        }
+
+        // Notify all listeners with error isolation
+        if (!listenersToNotify.isEmpty()) {
+            LOGGER.info("Notifying {} listeners of state transition"
+                            + "for HA group {} from {} to {} on {} cluster",
+                    listenersToNotify.size(), haGroupName, fromState, toState, 
clusterType);
+
+            for (HAGroupStateListener listener : listenersToNotify) {
+                try {
+                    listener.onStateChange(haGroupName,
+                            toState, modifiedTime, clusterType);
+                } catch (Exception e) {
+                    LOGGER.error("Error notifying listener of state transition 
"
+                                    + "for HA group {} from {} to {} on {} 
cluster",
+                            haGroupName, fromState, toState, clusterType, e);
+                    // Continue notifying other listeners
+                }
+            }
+        }
+    }
+
+    // ========== Helper Methods ==========
+
+    /**
+     * Build key for target state subscriptions.
+     */
+    private String buildTargetStateKey(ClusterType clusterType,
+                                       HAGroupState targetState) {
+        return clusterType + ":" + targetState;
+    }
+
 }
\ No newline at end of file
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
index e04d1a2dac..7045a301f3 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
@@ -36,7 +36,8 @@ import static org.apache.phoenix.query.QueryServicesOptions
 
 /**
  * Implementation of HAGroupStoreManager that uses HAGroupStoreClient.
- * Manages all HAGroupStoreClient instances.
+ * Manages all HAGroupStoreClient instances and provides passthrough
+ * functionality for HA group state change notifications.
  */
 public class HAGroupStoreManager {
     private static volatile HAGroupStoreManager haGroupStoreManagerInstance;
@@ -78,7 +79,6 @@ public class HAGroupStoreManager {
         return HAGroupStoreClient.getHAGroupNames(this.zkUrl);
     }
 
-
     /**
      * Checks whether mutation is blocked or not for a specific HA group.
      *
@@ -87,17 +87,15 @@ public class HAGroupStoreManager {
      * @return true if mutation is blocked, false otherwise.
      * @throws IOException when HAGroupStoreClient is not healthy.
      */
-    public boolean isMutationBlocked(String haGroupName) throws IOException, 
SQLException {
+    public boolean isMutationBlocked(String haGroupName) throws IOException {
         if (mutationBlockEnabled) {
-            HAGroupStoreClient haGroupStoreClient = 
HAGroupStoreClient.getInstanceForZkUrl(conf,
-                    haGroupName, zkUrl);
-            if (haGroupStoreClient != null) {
-                return haGroupStoreClient.getHAGroupStoreRecord() != null
-                        && 
haGroupStoreClient.getHAGroupStoreRecord().getClusterRole() != null
-                        && 
haGroupStoreClient.getHAGroupStoreRecord().getClusterRole()
-                        .isMutationBlocked();
-            }
-            throw new IOException("HAGroupStoreClient is not initialized");
+            HAGroupStoreClient haGroupStoreClient = 
getHAGroupStoreClient(haGroupName);
+            HAGroupStoreRecord recordWithMetadata
+                    = haGroupStoreClient.getHAGroupStoreRecord();
+            return recordWithMetadata != null
+                    && recordWithMetadata.getClusterRole() != null
+                    && recordWithMetadata.getClusterRole()
+                    .isMutationBlocked();
         }
         return false;
     }
@@ -138,13 +136,8 @@ public class HAGroupStoreManager {
      */
     public void invalidateHAGroupStoreClient(final String haGroupName,
             boolean broadcastUpdate) throws Exception {
-        HAGroupStoreClient haGroupStoreClient = 
HAGroupStoreClient.getInstanceForZkUrl(conf,
-                haGroupName, zkUrl);
-        if (haGroupStoreClient != null) {
-            haGroupStoreClient.rebuild();
-        } else {
-            throw new IOException("HAGroupStoreClient is not initialized");
-        }
+        HAGroupStoreClient haGroupStoreClient = 
getHAGroupStoreClient(haGroupName);
+        haGroupStoreClient.rebuild();
     }
 
     /**
@@ -156,13 +149,23 @@ public class HAGroupStoreManager {
      * @throws IOException when HAGroupStoreClient is not healthy.
      */
     public Optional<HAGroupStoreRecord> getHAGroupStoreRecord(final String 
haGroupName)
-            throws IOException, SQLException {
-        HAGroupStoreClient haGroupStoreClient = 
HAGroupStoreClient.getInstanceForZkUrl(conf,
-                haGroupName, zkUrl);
-        if (haGroupStoreClient != null) {
-            return 
Optional.ofNullable(haGroupStoreClient.getHAGroupStoreRecord());
-        }
-        throw new IOException("HAGroupStoreClient is not initialized");
+            throws IOException {
+        HAGroupStoreClient haGroupStoreClient = 
getHAGroupStoreClient(haGroupName);
+        return Optional.ofNullable(haGroupStoreClient.getHAGroupStoreRecord());
+    }
+
+    /**
+     * Returns the HAGroupStoreRecord for a specific HA group from peer 
cluster.
+     *
+     * @param haGroupName name of the HA group
+     * @return Optional HAGroupStoreRecord for the HA group from peer cluster 
can be empty if 
+     *        the HA group is not found or peer cluster is not available.
+     * @throws IOException when HAGroupStoreClient is not healthy.
+     */
+    public Optional<HAGroupStoreRecord> getPeerHAGroupStoreRecord(final String 
haGroupName)
+            throws IOException {
+        HAGroupStoreClient haGroupStoreClient = 
getHAGroupStoreClient(haGroupName);
+        return 
Optional.ofNullable(haGroupStoreClient.getHAGroupStoreRecordFromPeer());
     }
 
     /**
@@ -173,15 +176,10 @@ public class HAGroupStoreManager {
      */
     public void setHAGroupStatusToStoreAndForward(final String haGroupName)
             throws IOException, StaleHAGroupStoreRecordVersionException,
-            InvalidClusterRoleTransitionException, SQLException {
-        HAGroupStoreClient haGroupStoreClient = 
HAGroupStoreClient.getInstanceForZkUrl(conf,
-                haGroupName, zkUrl);
-        if (haGroupStoreClient != null) {
-            haGroupStoreClient.setHAGroupStatusIfNeeded(
-                    HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
-        } else {
-            throw new IOException("HAGroupStoreClient is not initialized");
-        }
+            InvalidClusterRoleTransitionException {
+        HAGroupStoreClient haGroupStoreClient = 
getHAGroupStoreClient(haGroupName);
+        haGroupStoreClient.setHAGroupStatusIfNeeded(
+                HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
     }
 
     /**
@@ -190,19 +188,78 @@ public class HAGroupStoreManager {
      * @param haGroupName name of the HA group
      * @throws IOException when HAGroupStoreClient is not healthy.
      */
-    public void setHAGroupStatusRecordToSync(final String haGroupName)
+    public void setHAGroupStatusToSync(final String haGroupName)
             throws IOException, StaleHAGroupStoreRecordVersionException,
-            InvalidClusterRoleTransitionException, SQLException {
-        HAGroupStoreClient haGroupStoreClient = 
HAGroupStoreClient.getInstanceForZkUrl(conf,
-                haGroupName, zkUrl);
-        if (haGroupStoreClient != null) {
-            haGroupStoreClient.setHAGroupStatusIfNeeded(
-                    HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
-        } else {
-            throw new IOException("HAGroupStoreClient is not initialized");
+            InvalidClusterRoleTransitionException {
+        HAGroupStoreClient haGroupStoreClient = 
getHAGroupStoreClient(haGroupName);
+        haGroupStoreClient.setHAGroupStatusIfNeeded(
+                HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
+    }
+
+    /**
+     * Sets the HAGroupStoreRecord to degrade reader functionality in local 
cluster.
+     * Transitions from STANDBY to DEGRADED_STANDBY_FOR_READER or from
+     * DEGRADED_STANDBY_FOR_WRITER to DEGRADED_STANDBY.
+     *
+     * @param haGroupName name of the HA group
+     * @throws IOException when HAGroupStoreClient is not healthy.
+     * @throws InvalidClusterRoleTransitionException when the current state
+     *   cannot transition to a degraded reader state
+     */
+    public void setReaderToDegraded(final String haGroupName)
+            throws IOException, StaleHAGroupStoreRecordVersionException,
+            InvalidClusterRoleTransitionException {
+        HAGroupStoreClient haGroupStoreClient = 
getHAGroupStoreClient(haGroupName);
+        HAGroupStoreRecord currentRecord
+                = haGroupStoreClient.getHAGroupStoreRecord();
+
+        if (currentRecord == null) {
+            throw new IOException("Current HAGroupStoreRecord is null for HA 
group: "
+                    + haGroupName);
+        }
+
+        HAGroupStoreRecord.HAGroupState currentState = 
currentRecord.getHAGroupState();
+        HAGroupStoreRecord.HAGroupState targetState
+                = HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER;
+
+        if (currentState == 
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER) {
+            targetState = HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY;
         }
+
+        haGroupStoreClient.setHAGroupStatusIfNeeded(targetState);
     }
 
+    /**
+     * Sets the HAGroupStoreRecord to restore reader functionality in local 
cluster.
+     * Transitions from DEGRADED_STANDBY_FOR_READER to STANDBY or from
+     * DEGRADED_STANDBY to DEGRADED_STANDBY_FOR_WRITER.
+     *
+     * @param haGroupName name of the HA group
+     * @throws IOException when HAGroupStoreClient is not healthy.
+     * @throws InvalidClusterRoleTransitionException when the current state
+     *   cannot transition to a healthy reader state
+     */
+    public void setReaderToHealthy(final String haGroupName)
+            throws IOException, StaleHAGroupStoreRecordVersionException,
+            InvalidClusterRoleTransitionException {
+        HAGroupStoreClient haGroupStoreClient = 
getHAGroupStoreClient(haGroupName);
+        HAGroupStoreRecord currentRecord
+                = haGroupStoreClient.getHAGroupStoreRecord();
+
+        if (currentRecord == null) {
+            throw new IOException("Current HAGroupStoreRecord is null "
+                    + "for HA group: " + haGroupName);
+        }
+
+        HAGroupStoreRecord.HAGroupState currentState = 
currentRecord.getHAGroupState();
+        HAGroupStoreRecord.HAGroupState targetState = 
HAGroupStoreRecord.HAGroupState.STANDBY;
+
+        if (currentState == HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY) {
+            targetState = 
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER;
+        }
+
+        haGroupStoreClient.setHAGroupStatusIfNeeded(targetState);
+    }
 
     /**
      * Returns the ClusterRoleRecord for the cluster pair.
@@ -214,12 +271,70 @@ public class HAGroupStoreManager {
      * @throws IOException when HAGroupStoreClient is not healthy.
      */
     public ClusterRoleRecord getClusterRoleRecord(String haGroupName)
-            throws IOException, SQLException {
+            throws IOException {
+        HAGroupStoreClient haGroupStoreClient = 
getHAGroupStoreClient(haGroupName);
+        return haGroupStoreClient.getClusterRoleRecord();
+    }
+
+    /**
+     * Subscribe to be notified when any transition to a target state occurs.
+     *
+     * @param haGroupName the name of the HA group to monitor
+     * @param targetState the target state to watch for
+     * @param clusterType whether to monitor local or peer cluster
+     * @param listener the listener to notify when any transition to the 
target state occurs
+     * @throws IOException if unable to get HAGroupStoreClient instance
+     */
+    public void subscribeToTargetState(String haGroupName,
+                                       HAGroupStoreRecord.HAGroupState 
targetState,
+                                       ClusterType clusterType,
+                                       HAGroupStateListener listener) throws 
IOException {
+        HAGroupStoreClient client = getHAGroupStoreClient(haGroupName);
+        client.subscribeToTargetState(targetState, clusterType, listener);
+        LOGGER.debug("Delegated subscription to target state {} "
+                        + "for HA group {} on {} cluster to client",
+                targetState, haGroupName, clusterType);
+    }
+
+    /**
+     * Unsubscribe from target state notifications.
+     *
+     * @param haGroupName the name of the HA group
+     * @param targetState the target state
+     * @param clusterType whether monitoring local or peer cluster
+     * @param listener the listener to remove
+     */
+    public void unsubscribeFromTargetState(String haGroupName,
+                                           HAGroupStoreRecord.HAGroupState 
targetState,
+                                           ClusterType clusterType,
+                                           HAGroupStateListener listener) {
+        try {
+            HAGroupStoreClient client = getHAGroupStoreClient(haGroupName);
+            client.unsubscribeFromTargetState(targetState, clusterType, 
listener);
+            LOGGER.debug("Delegated unsubscription from target state {} "
+                            + "for HA group {} on {} cluster to client",
+                    targetState, haGroupName, clusterType);
+        } catch (IOException e) {
+            LOGGER.warn("HAGroupStoreClient not found for HA group: {} - 
cannot unsubscribe: {}",
+                       haGroupName, e.getMessage());
+        }
+    }
+
+    /**
+     * Helper method to get HAGroupStoreClient instance with consistent error 
handling.
+     *
+     * @param haGroupName name of the HA group
+     * @return HAGroupStoreClient instance for the specified HA group
+     * @throws IOException when HAGroupStoreClient is not initialized
+     */
+    private HAGroupStoreClient getHAGroupStoreClient(final String haGroupName)
+            throws IOException {
         HAGroupStoreClient haGroupStoreClient = 
HAGroupStoreClient.getInstanceForZkUrl(conf,
                 haGroupName, zkUrl);
-        if (haGroupStoreClient != null) {
-            return haGroupStoreClient.getClusterRoleRecord();
+        if (haGroupStoreClient == null) {
+            throw new IOException("HAGroupStoreClient is not initialized "
+                    + "for HA group: " + haGroupName);
         }
-        throw new IOException("HAGroupStoreClient is not initialized");
+        return haGroupStoreClient;
     }
 }
\ No newline at end of file
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java
index aa49a22eb2..6891d93c46 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java
@@ -118,9 +118,11 @@ public class HAGroupStoreRecord {
             OFFLINE.allowedTransitions = ImmutableSet.of();
             // This needs to be manually recovered by operator
             UNKNOWN.allowedTransitions = ImmutableSet.of();
-            ACTIVE_NOT_IN_SYNC_TO_STANDBY.allowedTransitions = 
ImmutableSet.of(ABORT_TO_ACTIVE_NOT_IN_SYNC, 
+            ACTIVE_NOT_IN_SYNC_TO_STANDBY.allowedTransitions
+                    = ImmutableSet.of(ABORT_TO_ACTIVE_NOT_IN_SYNC,
                     ACTIVE_IN_SYNC_TO_STANDBY);
-            ACTIVE_IN_SYNC_TO_STANDBY.allowedTransitions = 
ImmutableSet.of(ABORT_TO_ACTIVE_IN_SYNC, STANDBY);
+            ACTIVE_IN_SYNC_TO_STANDBY.allowedTransitions
+                    = ImmutableSet.of(ABORT_TO_ACTIVE_IN_SYNC, STANDBY);
             STANDBY_TO_ACTIVE.allowedTransitions = 
ImmutableSet.of(ABORT_TO_STANDBY,
                     ACTIVE_IN_SYNC);
             DEGRADED_STANDBY.allowedTransitions
@@ -161,17 +163,28 @@ public class HAGroupStoreRecord {
     private final String protocolVersion;
     private final String haGroupName;
     private final HAGroupState haGroupState;
+    private final Long lastSyncStateTimeInMs;
 
     @JsonCreator
     public HAGroupStoreRecord(@JsonProperty("protocolVersion") String 
protocolVersion,
                               @JsonProperty("haGroupName") String haGroupName,
-                              @JsonProperty("haGroupState") HAGroupState 
haGroupState) {
+                              @JsonProperty("haGroupState") HAGroupState 
haGroupState,
+                              @JsonProperty("lastSyncStateTimeInMs") Long 
lastSyncStateTimeInMs) {
         Preconditions.checkNotNull(haGroupName, "HA group name cannot be 
null!");
         Preconditions.checkNotNull(haGroupState, "HA group state cannot be 
null!");
 
         this.protocolVersion = Objects.toString(protocolVersion, 
DEFAULT_PROTOCOL_VERSION);
         this.haGroupName = haGroupName;
         this.haGroupState = haGroupState;
+        this.lastSyncStateTimeInMs = lastSyncStateTimeInMs;
+    }
+
+    /**
+     * Convenience constructor for backward compatibility without 
lastSyncStateTimeInMs.
+     */
+    public HAGroupStoreRecord(String protocolVersion,
+                              String haGroupName, HAGroupState haGroupState) {
+        this(protocolVersion, haGroupName, haGroupState, null);
     }
 
     public static Optional<HAGroupStoreRecord> fromJson(byte[] bytes) {
@@ -193,9 +206,10 @@ public class HAGroupStoreRecord {
     }
 
     public boolean hasSameInfo(HAGroupStoreRecord other) {
-        return haGroupName.equals(other.haGroupName) &&
-                haGroupState.equals(other.haGroupState) &&
-                protocolVersion.equals(other.protocolVersion);
+        return haGroupName.equals(other.haGroupName)
+                && haGroupState.equals(other.haGroupState)
+                && protocolVersion.equals(other.protocolVersion)
+                && Objects.equals(lastSyncStateTimeInMs, 
other.lastSyncStateTimeInMs);
     }
 
     public String getProtocolVersion() {
@@ -211,6 +225,10 @@ public class HAGroupStoreRecord {
         return haGroupState;
     }
 
+    public Long getLastSyncStateTimeInMs() {
+        return lastSyncStateTimeInMs;
+    }
+
     @JsonIgnore
     public ClusterRoleRecord.ClusterRole getClusterRole() {
         return haGroupState.getClusterRole();
@@ -222,6 +240,7 @@ public class HAGroupStoreRecord {
                 .append(protocolVersion)
                 .append(haGroupName)
                 .append(haGroupState)
+                .append(lastSyncStateTimeInMs)
                 .hashCode();
     }
 
@@ -239,6 +258,7 @@ public class HAGroupStoreRecord {
                     .append(protocolVersion, otherRecord.protocolVersion)
                     .append(haGroupName, otherRecord.haGroupName)
                     .append(haGroupState, otherRecord.haGroupState)
+                    .append(lastSyncStateTimeInMs, 
otherRecord.lastSyncStateTimeInMs)
                     .isEquals();
         }
     }
@@ -249,6 +269,7 @@ public class HAGroupStoreRecord {
                 + "protocolVersion='" + protocolVersion + '\''
                 + ", haGroupName='" + haGroupName + '\''
                 + ", haGroupState=" + haGroupState
+                + ", lastSyncStateTimeInMs=" + lastSyncStateTimeInMs
                 + '}';
     }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java
index 9aa6173fd1..e66da0829b 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java
@@ -43,7 +43,7 @@ import org.junit.rules.TestName;
 
 import java.util.Map;
 
-import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -83,7 +83,7 @@ public class 
PhoenixRegionServerEndpointITWithConsistentFailover extends BaseTes
         assertNotNull(coprocessor);
         ServerRpcController controller = new ServerRpcController();
 
-        try (PhoenixHAAdmin peerHAAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(), 
ZK_CONSISTENT_HA_NAMESPACE)) {
+        try (PhoenixHAAdmin peerHAAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(), 
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE)) {
             HAGroupStoreRecord peerHAGroupStoreRecord = new 
HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, 
HAGroupState.STANDBY);
             
peerHAAdmin.createHAGroupStoreRecordInZooKeeper(peerHAGroupStoreRecord);
         }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
index fdf64dd2df..377bdf69d7 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
@@ -17,7 +17,7 @@
  */
 package org.apache.phoenix.end2end.index;
 
-import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
 import static 
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
 import static org.junit.Assert.assertEquals;
@@ -75,7 +75,7 @@ public class IndexRegionObserverMutationBlockingIT extends 
BaseTest {
 
     @Before
     public void setUp() throws Exception {
-        haAdmin = new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_NAMESPACE);
+        haAdmin = new PhoenixHAAdmin(config, 
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
         Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
         String zkUrl = getLocalZkUrl(config);
         String peerZkUrl = CLUSTERS.getZkUrl2();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStateSubscriptionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStateSubscriptionIT.java
new file mode 100644
index 0000000000..32239e3887
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStateSubscriptionIT.java
@@ -0,0 +1,644 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.HAGroupStoreTestUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static 
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for HA Group State Change subscription functionality.
+ * Tests the new subscription system where HAGroupStoreClient directly manages
+ * subscriptions and HAGroupStoreManager acts as a passthrough.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class HAGroupStateSubscriptionIT extends BaseTest {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HAGroupStateSubscriptionIT.class);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private PhoenixHAAdmin haAdmin;
+    private PhoenixHAAdmin peerHaAdmin;
+    private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 2000L;
+    private String zkUrl;
+    private String peerZKUrl;
+    private static final 
HighAvailabilityTestingUtility.HBaseTestingUtilityPair CLUSTERS = new 
HighAvailabilityTestingUtility.HBaseTestingUtilityPair();
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "true");
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+        CLUSTERS.start();
+    }
+
+    @Before
+    public void before() throws Exception {
+        haAdmin = new PhoenixHAAdmin(config, 
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
+        zkUrl = getLocalZkUrl(config);
+        this.peerZKUrl = CLUSTERS.getZkUrl2();
+        peerHaAdmin = new PhoenixHAAdmin(peerZKUrl, config, 
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
+
+        // Clean up existing HAGroupStoreRecords
+        try {
+            List<String> haGroupNames = 
HAGroupStoreClient.getHAGroupNames(zkUrl);
+            for (String haGroupName : haGroupNames) {
+                
haAdmin.getCurator().delete().quietly().forPath(toPath(haGroupName));
+                
peerHaAdmin.getCurator().delete().quietly().forPath(toPath(haGroupName));
+            }
+
+        } catch (Exception e) {
+            // Ignore cleanup errors
+        }
+        // Remove any existing entries in the system table
+        HAGroupStoreTestUtil.deleteAllHAGroupRecordsInSystemTable(zkUrl);
+
+        // Insert a HAGroupStoreRecord into the system table
+        
HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(), 
zkUrl, peerZKUrl,
+                ClusterRoleRecord.ClusterRole.ACTIVE, 
ClusterRoleRecord.ClusterRole.STANDBY, null);
+    }
+
+    // ========== Multi-Cluster & Basic Subscription Tests ==========
+
+    @Test
+    public void testDifferentTargetStatesPerCluster() throws Exception {
+        String haGroupName = testName.getMethodName();
+        HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+        // Track notifications
+        AtomicInteger localNotifications = new AtomicInteger(0);
+        AtomicInteger peerNotifications = new AtomicInteger(0);
+        AtomicReference<ClusterType> lastLocalClusterType = new 
AtomicReference<>();
+        AtomicReference<ClusterType> lastPeerClusterType = new 
AtomicReference<>();
+
+        // Create listeners for different target states
+        HAGroupStateListener localListener = (groupName, toState, 
modifiedTime, clusterType) -> {
+            if (toState == HAGroupState.STANDBY_TO_ACTIVE && clusterType == 
ClusterType.LOCAL) {
+                localNotifications.incrementAndGet();
+                lastLocalClusterType.set(clusterType);
+                LOGGER.info("Local target state listener called: {} on {}", 
toState, clusterType);
+            }
+        };
+
+        HAGroupStateListener peerListener = (groupName, toState, modifiedTime, 
clusterType) -> {
+            if (toState == HAGroupState.ACTIVE_NOT_IN_SYNC && clusterType == 
ClusterType.PEER) {
+                peerNotifications.incrementAndGet();
+                lastPeerClusterType.set(clusterType);
+                LOGGER.info("Peer target state listener called: {} on {}", 
toState, clusterType);
+            }
+        };
+
+        // Subscribe to different target states on different clusters
+        manager.subscribeToTargetState(haGroupName, 
HAGroupState.STANDBY_TO_ACTIVE, ClusterType.LOCAL, localListener);
+        manager.subscribeToTargetState(haGroupName, 
HAGroupState.ACTIVE_NOT_IN_SYNC, ClusterType.PEER, peerListener);
+
+        // Trigger transition to STANDBY_TO_ACTIVE on LOCAL cluster
+        HAGroupStoreRecord localRecord = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.STANDBY_TO_ACTIVE);
+        haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, 
0);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Trigger transition to STANDBY on PEER cluster
+        HAGroupStoreRecord peerRecord = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC);
+        peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Verify no cross-cluster triggering
+        assertEquals("Local cluster should receive its target state 
notification", 1, localNotifications.get());
+        assertEquals("Peer cluster should receive its target state 
notification", 1, peerNotifications.get());
+        assertEquals("Local notification should have LOCAL cluster type", 
ClusterType.LOCAL, lastLocalClusterType.get());
+        assertEquals("Peer notification should have PEER cluster type", 
ClusterType.PEER, lastPeerClusterType.get());
+
+    }
+
+    @Test
+    public void testUnsubscribeSpecificCluster() throws Exception {
+        String haGroupName = testName.getMethodName();
+        HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+        // Track notifications
+        AtomicInteger totalNotifications = new AtomicInteger(0);
+        AtomicReference<ClusterType> lastClusterType = new AtomicReference<>();
+
+        HAGroupStateListener listener = (groupName, toState, modifiedTime, 
clusterType) -> {
+            if (toState == HAGroupState.STANDBY) {
+                totalNotifications.incrementAndGet();
+                lastClusterType.set(clusterType);
+                LOGGER.info("Listener called: {} on {}", toState, clusterType);
+            }
+        };
+
+        // Subscribe to same target state on both clusters
+        manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, 
ClusterType.LOCAL, listener);
+        manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, 
ClusterType.PEER, listener);
+
+        // Unsubscribe from LOCAL cluster only
+        manager.unsubscribeFromTargetState(haGroupName, HAGroupState.STANDBY, 
ClusterType.LOCAL, listener);
+
+        // Trigger transition to STANDBY on LOCAL → should NOT call listener
+        HAGroupStoreRecord localRecord = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.STANDBY);
+        haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, 
0);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        assertEquals("Should receive no notifications from LOCAL cluster", 0, 
totalNotifications.get());
+
+        // Trigger transition to STANDBY on PEER → should call listener
+        HAGroupStoreRecord peerRecord = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.STANDBY);
+        peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        assertEquals("Should receive notification only from PEER cluster", 1, 
totalNotifications.get());
+        assertEquals("Notification should be from PEER cluster", 
ClusterType.PEER, lastClusterType.get());
+
+    }
+
+    // ========== Multiple Listeners Tests ==========
+
+    @Test
+    public void testMultipleListenersMultipleClusters() throws Exception {
+        String haGroupName = testName.getMethodName();
+        HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+        // Track notifications from multiple listeners
+        AtomicInteger listener1LocalNotifications = new AtomicInteger(0);
+        AtomicInteger listener2LocalNotifications = new AtomicInteger(0);
+        AtomicInteger listener1PeerNotifications = new AtomicInteger(0);
+        AtomicInteger listener2PeerNotifications = new AtomicInteger(0);
+
+        HAGroupStateListener listener1 = (groupName, toState, modifiedTime, 
clusterType) -> {
+            if (toState == HAGroupState.DEGRADED_STANDBY) {
+                if (clusterType == ClusterType.LOCAL) {
+                    listener1LocalNotifications.incrementAndGet();
+                } else {
+                    listener1PeerNotifications.incrementAndGet();
+                }
+                LOGGER.info("Listener1 called: {} on {}", toState, 
clusterType);
+            }
+        };
+
+        HAGroupStateListener listener2 = (groupName, toState, modifiedTime, 
clusterType) -> {
+            if (toState == HAGroupState.DEGRADED_STANDBY) {
+                if (clusterType == ClusterType.LOCAL) {
+                    listener2LocalNotifications.incrementAndGet();
+                } else {
+                    listener2PeerNotifications.incrementAndGet();
+                }
+                LOGGER.info("Listener2 called: {} on {}", toState, 
clusterType);
+            }
+        };
+
+        // Register multiple listeners for same target state on both clusters
+        manager.subscribeToTargetState(haGroupName, 
HAGroupState.DEGRADED_STANDBY, ClusterType.LOCAL, listener1);
+        manager.subscribeToTargetState(haGroupName, 
HAGroupState.DEGRADED_STANDBY, ClusterType.LOCAL, listener2);
+        manager.subscribeToTargetState(haGroupName, 
HAGroupState.DEGRADED_STANDBY, ClusterType.PEER, listener1);
+        manager.subscribeToTargetState(haGroupName, 
HAGroupState.DEGRADED_STANDBY, ClusterType.PEER, listener2);
+
+        // Trigger transition to DEGRADED_STANDBY on LOCAL
+        HAGroupStoreRecord localRecord = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.DEGRADED_STANDBY);
+        haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, 
0);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Trigger transition to DEGRADED_STANDBY on PEER
+        HAGroupStoreRecord peerRecord = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.DEGRADED_STANDBY);
+        peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Verify all listeners called for each cluster
+        assertEquals("Listener1 should receive LOCAL notification", 1, 
listener1LocalNotifications.get());
+        assertEquals("Listener2 should receive LOCAL notification", 1, 
listener2LocalNotifications.get());
+        assertEquals("Listener1 should receive PEER notification", 1, 
listener1PeerNotifications.get());
+        assertEquals("Listener2 should receive PEER notification", 1, 
listener2PeerNotifications.get());
+
+    }
+
+    @Test
+    public void testSameListenerDifferentTargetStates() throws Exception {
+        String haGroupName = testName.getMethodName();
+        HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+        // Track which target states were reached
+        AtomicInteger stateANotifications = new AtomicInteger(0);
+        AtomicInteger stateBNotifications = new AtomicInteger(0);
+        AtomicReference<ClusterType> lastStateAClusterType = new 
AtomicReference<>();
+        AtomicReference<ClusterType> lastStateBClusterType = new 
AtomicReference<>();
+
+        HAGroupStateListener sharedListener = (groupName, toState, 
modifiedTime, clusterType) -> {
+            if (toState == HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY && 
clusterType == ClusterType.LOCAL) {
+                stateANotifications.incrementAndGet();
+                lastStateAClusterType.set(clusterType);
+                LOGGER.info("Shared listener - Target State A: {} on {}", 
toState, clusterType);
+            } else if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType 
== ClusterType.PEER) {
+                stateBNotifications.incrementAndGet();
+                lastStateBClusterType.set(clusterType);
+                LOGGER.info("Shared listener - Target State B: {} on {}", 
toState, clusterType);
+            }
+        };
+
+        // Register same listener for different target states on different 
clusters
+        manager.subscribeToTargetState(haGroupName, 
HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, ClusterType.LOCAL, sharedListener);
+        manager.subscribeToTargetState(haGroupName, 
HAGroupState.ACTIVE_IN_SYNC, ClusterType.PEER, sharedListener);
+
+        // Trigger target state A on LOCAL
+        HAGroupStoreRecord localRecord = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY);
+        haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, 
0);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Trigger target state B on PEER
+        HAGroupStoreRecord peerRecord = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.ACTIVE_IN_SYNC);
+        peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Verify listener called for each appropriate target state/cluster 
combination
+        assertEquals("Should receive target state A notification", 1, 
stateANotifications.get());
+        assertEquals("Should receive target state B notification", 1, 
stateBNotifications.get());
+        assertEquals("Target state A should be from LOCAL cluster", 
ClusterType.LOCAL, lastStateAClusterType.get());
+        assertEquals("Target state B should be from PEER cluster", 
ClusterType.PEER, lastStateBClusterType.get());
+
+    }
+
+    // ========== Edge Cases & Error Handling ==========
+
+    @Test
+    public void testSubscriptionToNonExistentHAGroup() throws Exception {
+        String nonExistentHAGroup = "nonExistentGroup_" + 
testName.getMethodName();
+        HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+        HAGroupStateListener listener = (groupName, toState, modifiedTime, 
clusterType) -> {
+            // Should not be called
+        };
+
+        // Try to subscribe to non-existent HA group
+        try {
+            manager.subscribeToTargetState(nonExistentHAGroup, 
HAGroupState.STANDBY, ClusterType.LOCAL, listener);
+            fail("Expected IOException for non-existent HA group");
+        } catch (IOException e) {
+            assertTrue("Exception should mention the HA group name", 
e.getMessage().contains(nonExistentHAGroup));
+            LOGGER.info("Correctly caught exception for non-existent HA group: 
{}", e.getMessage());
+        }
+    }
+
+    @Test
+    public void testListenerExceptionIsolation() throws Exception {
+        String haGroupName = testName.getMethodName();
+        HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+        // Track notifications
+        AtomicInteger goodListener1Notifications = new AtomicInteger(0);
+        AtomicInteger goodListener2Notifications = new AtomicInteger(0);
+        AtomicInteger badListenerCalls = new AtomicInteger(0);
+
+        HAGroupStateListener goodListener1 = (groupName, toState, 
modifiedTime, clusterType) -> {
+            if (toState == HAGroupState.ACTIVE_IN_SYNC) {
+                goodListener1Notifications.incrementAndGet();
+                LOGGER.info("Good listener 1 called: {} on {}", toState, 
clusterType);
+            }
+        };
+
+        HAGroupStateListener badListener = (groupName, toState, modifiedTime, 
clusterType) -> {
+            if (toState == HAGroupState.ACTIVE_IN_SYNC) {
+                badListenerCalls.incrementAndGet();
+                LOGGER.info("Bad listener called, about to throw exception");
+                throw new RuntimeException("Test exception from bad listener");
+            }
+        };
+
+        HAGroupStateListener goodListener2 = (groupName, toState, 
modifiedTime, clusterType) -> {
+            if (toState == HAGroupState.ACTIVE_IN_SYNC) {
+                goodListener2Notifications.incrementAndGet();
+                LOGGER.info("Good listener 2 called: {} on {}", toState, 
clusterType);
+            }
+        };
+
+        // Register listeners - bad listener in the middle
+        manager.subscribeToTargetState(haGroupName, 
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, goodListener1);
+        manager.subscribeToTargetState(haGroupName, 
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, badListener);
+        manager.subscribeToTargetState(haGroupName, 
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, goodListener2);
+
+        // Trigger transition to target state
+        HAGroupStoreRecord transitionRecord = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.ACTIVE_IN_SYNC);
+        haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
transitionRecord, 0);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Verify all listeners were called despite exception in bad listener
+        assertEquals("Good listener 1 should be called", 1, 
goodListener1Notifications.get());
+        assertEquals("Good listener 2 should be called", 1, 
goodListener2Notifications.get());
+        assertEquals("Bad listener should be called", 1, 
badListenerCalls.get());
+    }
+
+    // ========== Performance & Concurrency Tests ==========
+
+    @Test
+    public void testConcurrentMultiClusterSubscriptions() throws Exception {
+        String haGroupName = testName.getMethodName();
+        HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+        final int threadCount = 10;
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch completionLatch = new CountDownLatch(threadCount);
+        final AtomicInteger successfulSubscriptions = new AtomicInteger(0);
+
+        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+        // Create concurrent subscription tasks
+        for (int i = 0; i < threadCount; i++) {
+            final int threadIndex = i;
+            executor.submit(() -> {
+                try {
+                    startLatch.await(); // Wait for all threads to be ready
+
+                    HAGroupStateListener listener = (groupName, toState, 
modifiedTime, clusterType) -> {
+                        LOGGER.debug("Thread {} listener called: {} on {}", 
threadIndex, toState, clusterType);
+                    };
+
+                    // Half subscribe to LOCAL, half to PEER
+                    ClusterType clusterType = (threadIndex % 2 == 0) ? 
ClusterType.LOCAL : ClusterType.PEER;
+
+                    manager.subscribeToTargetState(haGroupName, 
HAGroupState.ACTIVE_IN_SYNC, clusterType, listener);
+                    successfulSubscriptions.incrementAndGet();
+
+                    // Also test unsubscribe
+                    manager.unsubscribeFromTargetState(haGroupName, 
HAGroupState.ACTIVE_IN_SYNC, clusterType, listener);
+
+                } catch (Exception e) {
+                    LOGGER.error("Thread {} failed", threadIndex, e);
+                } finally {
+                    completionLatch.countDown();
+                }
+            });
+        }
+
+        // Start all threads simultaneously
+        startLatch.countDown();
+
+        // Wait for completion
+        assertTrue("All threads should complete within timeout", 
completionLatch.await(30, TimeUnit.SECONDS));
+        assertEquals("All threads should successfully subscribe", threadCount, 
successfulSubscriptions.get());
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void testHighFrequencyMultiClusterChanges() throws Exception {
+        String haGroupName = testName.getMethodName();
+        HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+        // Track notifications
+        AtomicInteger localNotifications = new AtomicInteger(0);
+        AtomicInteger peerNotifications = new AtomicInteger(0);
+
+        HAGroupStateListener listener = (groupName, toState, modifiedTime, 
clusterType) -> {
+            if (clusterType == ClusterType.LOCAL) {
+                localNotifications.incrementAndGet();
+            } else {
+                peerNotifications.incrementAndGet();
+            }
+            LOGGER.debug("High frequency listener: {} on {}", toState, 
clusterType);
+        };
+
+        // Subscribe to target state on both clusters
+        manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, 
ClusterType.LOCAL, listener);
+        manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, 
ClusterType.PEER, listener);
+
+        // Rapidly alternate state changes on both clusters
+        final int changeCount = 5;
+        HAGroupStoreRecord initialPeerRecord = new HAGroupStoreRecord("1.0", 
haGroupName,HAGroupState.DEGRADED_STANDBY_FOR_WRITER);
+        peerHaAdmin.createHAGroupStoreRecordInZooKeeper(initialPeerRecord);
+
+        for (int i = 0; i < changeCount; i++) {
+            // Change local cluster
+            HAGroupStoreRecord localRecord = new HAGroupStoreRecord("1.0", 
haGroupName,
+                    (i % 2 == 0) ? HAGroupState.STANDBY : 
HAGroupState.DEGRADED_STANDBY_FOR_READER);
+            haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
localRecord, -1);
+
+            // Change peer cluster
+            HAGroupStoreRecord peerRecord = new HAGroupStoreRecord("1.0", 
haGroupName,
+                    (i % 2 == 0) ? HAGroupState.STANDBY : 
HAGroupState.DEGRADED_STANDBY_FOR_WRITER);
+            peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
peerRecord, -1);
+
+            // Small delay between changes
+            Thread.sleep(500);
+        }
+
+        // Final wait for all events to propagate
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Verify transitions detected on both clusters
+        // Expected: 3 transitions to STANDBY state (i=0,2,4 → STANDBY)
+        assertEquals("Should detect exactly 3 local cluster transitions to 
STANDBY", 3, localNotifications.get());
+        assertEquals("Should detect exactly 3 peer cluster transitions to 
STANDBY", 3, peerNotifications.get());
+
+        LOGGER.info("Detected {} local and {} peer notifications", 
localNotifications.get(), peerNotifications.get());
+
+    }
+
+    // ========== Cleanup & Resource Management Tests ==========
+
+    @Test
+    public void testSubscriptionCleanupPerCluster() throws Exception {
+        String haGroupName = testName.getMethodName();
+        HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+        // Track notifications to verify functionality
+        AtomicInteger localActiveNotifications = new AtomicInteger(0);
+        AtomicInteger peerActiveNotifications = new AtomicInteger(0);
+        AtomicInteger localStandbyNotifications = new AtomicInteger(0);
+        AtomicInteger peerStandbyNotifications = new AtomicInteger(0);
+
+        // Create listeners that track which ones are called
+        HAGroupStateListener listener1 = (groupName, toState, modifiedTime, 
clusterType) -> {
+            if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType == 
ClusterType.LOCAL) {
+                localActiveNotifications.incrementAndGet();
+                LOGGER.info("Listener1 LOCAL ACTIVE_IN_SYNC: {}", toState);
+            }
+        };
+
+        HAGroupStateListener listener2 = (groupName, toState, modifiedTime, 
clusterType) -> {
+            if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType == 
ClusterType.LOCAL) {
+                localActiveNotifications.incrementAndGet();
+                LOGGER.info("Listener2 LOCAL ACTIVE_IN_SYNC: {}", toState);
+            } else if (toState == HAGroupState.STANDBY_TO_ACTIVE && 
clusterType == ClusterType.PEER) {
+                peerActiveNotifications.incrementAndGet();
+                LOGGER.info("Listener2 PEER STANDBY_TO_ACTIVE: {}", toState);
+            }
+        };
+
+        HAGroupStateListener listener3 = (groupName, toState, modifiedTime, 
clusterType) -> {
+            if (toState == HAGroupState.STANDBY_TO_ACTIVE && clusterType == 
ClusterType.PEER) {
+                peerActiveNotifications.incrementAndGet();
+                LOGGER.info("Listener3 PEER STANDBY_TO_ACTIVE: {}", toState);
+            }
+        };
+
+        HAGroupStateListener listener4 = (groupName, toState, modifiedTime, 
clusterType) -> {
+            if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType == 
ClusterType.LOCAL) {
+                localStandbyNotifications.incrementAndGet();
+                LOGGER.info("Listener4 LOCAL ACTIVE_IN_SYNC: {}", toState);
+            } else if (toState == HAGroupState.STANDBY_TO_ACTIVE && 
clusterType == ClusterType.PEER) {
+                peerStandbyNotifications.incrementAndGet();
+                LOGGER.info("Listener4 PEER STANDBY_TO_ACTIVE: {}", toState);
+            }
+        };
+
+        // Subscribe listeners to both clusters for target states
+        manager.subscribeToTargetState(haGroupName, 
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, listener1);
+        manager.subscribeToTargetState(haGroupName, 
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, listener2);
+        manager.subscribeToTargetState(haGroupName, 
HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER, listener2);
+        manager.subscribeToTargetState(haGroupName, 
HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER, listener3);
+        manager.subscribeToTargetState(haGroupName, 
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, listener4);
+        manager.subscribeToTargetState(haGroupName, 
HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER, listener4);
+
+        // Test initial functionality - trigger ACTIVE_IN_SYNC on LOCAL
+        HAGroupStoreRecord localActiveRecord = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.ACTIVE_IN_SYNC);
+        haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
localActiveRecord, 0);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Should have 2 notifications for LOCAL ACTIVE_NOT_IN_SYNC (listener1 
+ listener2)
+        assertEquals("Should have 2 LOCAL ACTIVE_IN_SYNC notifications 
initially", 2, localActiveNotifications.get());
+
+        // Test initial functionality - trigger STANDBY_TO_ACTIVE on PEER
+        HAGroupStoreRecord peerActiveRecord = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.STANDBY_TO_ACTIVE);
+        peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerActiveRecord);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Should have 2 notifications for PEER STANDBY_TO_ACTIVE (listener2 + 
listener3)
+        assertEquals("Should have 2 PEER STANDBY_TO_ACTIVE notifications 
initially", 2, peerActiveNotifications.get());
+
+        // Reset counters for cleanup testing
+        localActiveNotifications.set(0);
+        peerActiveNotifications.set(0);
+
+        // Unsubscribe selectively
+        manager.unsubscribeFromTargetState(haGroupName, 
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, listener1);
+        manager.unsubscribeFromTargetState(haGroupName, 
HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER, listener2);
+        manager.unsubscribeFromTargetState(haGroupName, 
HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, listener4);
+
+        // Test after partial unsubscribe - trigger ACTIVE_IN_SYNC on LOCAL 
again by first changing to some other state.
+        HAGroupStoreRecord localActiveRecord2 = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC);
+        haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
localActiveRecord2, 1);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        HAGroupStoreRecord localActiveRecord3 = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.ACTIVE_IN_SYNC);
+        haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
localActiveRecord3, 2);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Should have only 1 notification for LOCAL ACTIVE_IN_SYNC (only 
listener2 remains)
+        assertEquals("Should have 1 LOCAL ACTIVE_IN_SYNC notification after 
partial unsubscribe", 1, localActiveNotifications.get());
+
+        // Test after partial unsubscribe - trigger STANDBY_TO_ACTIVE on PEER 
again
+        HAGroupStoreRecord peerActiveRecord2 = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.STANDBY);
+        peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
peerActiveRecord2, 0);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        HAGroupStoreRecord peerActiveRecord3 = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.STANDBY_TO_ACTIVE);
+        peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
peerActiveRecord3, 1);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Should have only 1 notification for PEER STANDBY_TO_ACTIVE (only 
listener3 remains)
+        assertEquals("Should have 1 PEER STANDBY_TO_ACTIVE notification after 
partial unsubscribe", 1, peerActiveNotifications.get());
+
+        // Reset counters again
+        localActiveNotifications.set(0);
+        peerActiveNotifications.set(0);
+
+        // Unsubscribe all remaining listeners
+        manager.unsubscribeFromTargetState(haGroupName, 
HAGroupState.ACTIVE_NOT_IN_SYNC, ClusterType.LOCAL, listener2);
+        manager.unsubscribeFromTargetState(haGroupName, 
HAGroupState.ACTIVE_NOT_IN_SYNC, ClusterType.PEER, listener3);
+        manager.unsubscribeFromTargetState(haGroupName, HAGroupState.STANDBY, 
ClusterType.PEER, listener4);
+
+        // Test after complete unsubscribe - trigger ACTIVE_NOT_IN_SYNC on 
both clusters
+        HAGroupStoreRecord localActiveRecord4 = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC);
+        haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
localActiveRecord4, 3);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        HAGroupStoreRecord peerActiveRecord4 = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC);
+        peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
peerActiveRecord4, 2);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Should have no notifications after complete unsubscribe
+        assertEquals("Should have 0 LOCAL ACTIVE_NOT_IN_SYNC notifications 
after complete unsubscribe", 0, localActiveNotifications.get());
+        assertEquals("Should have 0 PEER ACTIVE_NOT_IN_SYNC notifications 
after complete unsubscribe", 0, peerActiveNotifications.get());
+
+        // Test that new subscriptions still work properly
+        AtomicInteger newSubscriptionNotifications = new AtomicInteger(0);
+        HAGroupStateListener newTestListener = (groupName, toState, 
modifiedTime, clusterType) -> {
+            if (toState == HAGroupState.STANDBY && clusterType == 
ClusterType.LOCAL) {
+                newSubscriptionNotifications.incrementAndGet();
+                LOGGER.info("New subscription triggered: {} on {} at {}", 
toState, clusterType, modifiedTime);
+            }
+        };
+
+        // Subscribe with new test listener
+        manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, 
ClusterType.LOCAL, newTestListener);
+
+        // Trigger STANDBY state and verify new subscription works
+        HAGroupStoreRecord standbyRecord = new HAGroupStoreRecord("1.0", 
haGroupName, HAGroupState.STANDBY);
+        haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
standbyRecord, 4);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Expected: exactly 1 notification for the new subscription
+        assertEquals("New subscription should receive exactly 1 notification", 
1, newSubscriptionNotifications.get());
+
+        LOGGER.info("Subscription cleanup test completed successfully with {} 
notifications from new subscription",
+                   newSubscriptionNotifications.get());
+
+    }
+
+    /**
+     * Helper method to access private subscription maps via reflection
+     */
+    @SuppressWarnings("unchecked")
+    private ConcurrentHashMap<String, 
CopyOnWriteArraySet<HAGroupStateListener>> 
getSubscriptionMap(HAGroupStoreClient client, String fieldName) throws 
Exception {
+        Field field = HAGroupStoreClient.class.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return (ConcurrentHashMap<String, 
CopyOnWriteArraySet<HAGroupStateListener>>) field.get(client);
+    }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
index 5a062b3305..9e8087a57f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
@@ -40,6 +40,7 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -52,7 +53,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
-import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
@@ -91,8 +92,8 @@ public class HAGroupStoreClientIT extends BaseTest {
 
     @Before
     public void before() throws Exception {
-        haAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(), 
ZK_CONSISTENT_HA_NAMESPACE);
-        peerHaAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(), 
ZK_CONSISTENT_HA_NAMESPACE);
+        haAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(), 
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
+        peerHaAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(), 
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
         
haAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
         
peerHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
         zkUrl = getLocalZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration());
@@ -429,6 +430,8 @@ public class HAGroupStoreClientIT extends BaseTest {
         // The record should be automatically rebuilt from System Table as it 
is not in ZK
         assertNotNull(currentRecord);
         assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC, 
currentRecord.getHAGroupState());
+        // The record should have a timestamp
+        assertNotNull(currentRecord.getLastSyncStateTimeInMs());
 
         record1 = new HAGroupStoreRecord("v1.0", haGroupName,
                 HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC_TO_STANDBY);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
index 34d143e5e6..c9e17294c5 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
@@ -17,13 +17,16 @@
  */
 package org.apache.phoenix.jdbc;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.exception.InvalidClusterRoleTransitionException;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.HAGroupStoreTestUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.zookeeper.data.Stat;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -31,18 +34,25 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 
+import java.lang.reflect.Field;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
+import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
+import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_SESSION_TIMEOUT_MULTIPLIER;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
 import static 
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Integration tests for {@link HAGroupStoreManager}.
@@ -63,13 +73,14 @@ public class HAGroupStoreManagerIT extends BaseTest {
     public static synchronized void doSetup() throws Exception {
         Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
         props.put(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "true");
+        props.put(ZK_SESSION_TIMEOUT, String.valueOf(30*1000));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
         CLUSTERS.start();
     }
 
     @Before
     public void before() throws Exception {
-        haAdmin = new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_NAMESPACE);
+        haAdmin = new PhoenixHAAdmin(config, 
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
         zkUrl = getLocalZkUrl(config);
         this.peerZKUrl = CLUSTERS.getZkUrl2();
 
@@ -147,7 +158,6 @@ public class HAGroupStoreManagerIT extends BaseTest {
         // Should be present
         assertTrue(recordOpt.isPresent());
 
-
         // Delete record from System Table
         HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName, 
zkUrl);
         // Delete record from ZK
@@ -166,12 +176,90 @@ public class HAGroupStoreManagerIT extends BaseTest {
         Optional<HAGroupStoreRecord>  retrievedOpt = 
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
         assertTrue(retrievedOpt.isPresent());
 
-        // Record for comparison
-        HAGroupStoreRecord record = new HAGroupStoreRecord(
-                HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, 
HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
+        // Get MTime from HAAdmin for equality verification below.
+        Pair<HAGroupStoreRecord, Stat> currentRecordAndStat = 
haAdmin.getHAGroupStoreRecordInZooKeeper(haGroupName);
+
+        // Complete object comparison field-by-field
+        assertEquals(haGroupName, retrievedOpt.get().getHaGroupName());
+        assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC, 
retrievedOpt.get().getHAGroupState());
+        Long lastSyncStateTimeInMs = 
retrievedOpt.get().getLastSyncStateTimeInMs();
+        Long mtime = currentRecordAndStat.getRight().getMtime();
+        // Allow a small margin of error
+        assertTrue(Math.abs(lastSyncStateTimeInMs - mtime) <= 1);
+        assertEquals(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, 
retrievedOpt.get().getProtocolVersion());
+    }
+
+    @Test
+    public void testGetPeerHAGroupStoreRecord() throws Exception {
+        String haGroupName = testName.getMethodName();
+        HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(config);
+
+        // Initially, peer record should not be present
+        Optional<HAGroupStoreRecord> peerRecordOpt = 
haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+        assertFalse(peerRecordOpt.isPresent());
+
+        // Create a peer HAAdmin to create records in peer cluster
+        PhoenixHAAdmin peerHaAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(),
+                ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
+
+        try {
+            // Create a HAGroupStoreRecord in the peer cluster
+            HAGroupStoreRecord peerRecord = new HAGroupStoreRecord(
+                    "1.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.STANDBY);
+
+            peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+            Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+            // Now peer record should be present
+            peerRecordOpt = 
haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+            assertTrue(peerRecordOpt.isPresent());
+
+            // Verify the peer record details
+            HAGroupStoreRecord retrievedPeerRecord = peerRecordOpt.get();
+            assertEquals(haGroupName, retrievedPeerRecord.getHaGroupName());
+            assertEquals(HAGroupStoreRecord.HAGroupState.STANDBY, 
retrievedPeerRecord.getHAGroupState());
+            assertEquals(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, 
retrievedPeerRecord.getProtocolVersion());
+
+            // Delete peer record
+            peerHaAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName);
+            Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+            // Peer record should no longer be present
+            peerRecordOpt = 
haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+            assertFalse(peerRecordOpt.isPresent());
+
+            // Create peer record again with different state
+            HAGroupStoreRecord newPeerRecord = new HAGroupStoreRecord(
+                    "1.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER);
+
+            peerHaAdmin.createHAGroupStoreRecordInZooKeeper(newPeerRecord);
+            Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+            // Verify the updated peer record
+            peerRecordOpt = 
haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+            assertTrue(peerRecordOpt.isPresent());
+            
assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER,
+                    peerRecordOpt.get().getHAGroupState());
+
+        } finally {
+            // Clean up peer record
+            try {
+                peerHaAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName);
+            } catch (Exception e) {
+                // Ignore cleanup errors
+            }
+            peerHaAdmin.close();
+        }
+    }
 
-        // Complete object comparison instead of field-by-field
-        assertEquals(record, retrievedOpt.get());
+    @Test
+    public void testGetPeerHAGroupStoreRecordWhenHAGroupNotInSystemTable() 
throws Exception {
+        String haGroupName = testName.getMethodName();
+        HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(config);
+
+        // Try to get peer record for an HA group that doesn't exist in system 
table
+        Optional<HAGroupStoreRecord> peerRecordOpt = 
haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+        assertFalse("Peer record should not be present for non-existent HA 
group", peerRecordOpt.isPresent());
     }
 
     @Test
@@ -214,9 +302,13 @@ public class HAGroupStoreManagerIT extends BaseTest {
         conf.set(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "false");
         conf.set(HConstants.ZOOKEEPER_QUORUM, getLocalZkUrl(config));
 
-        HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(config);
+        // Set the HAGroupStoreManager instance to null via reflection to 
force recreation
+        Field field = 
HAGroupStoreManager.class.getDeclaredField("haGroupStoreManagerInstance");
+        field.setAccessible(true);
+        field.set(null, null);
 
-        // Create HAGroupStoreRecord with ACTIVE_TO_STANDBY role
+        HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(config);
+        // Create HAGroupStoreRecord with ACTIVE_IN_SYNC_TO_STANDBY role
         HAGroupStoreRecord transitionRecord = new HAGroupStoreRecord(
                 "1.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY);
 
@@ -225,6 +317,9 @@ public class HAGroupStoreManagerIT extends BaseTest {
 
         // Mutations should not be blocked even with ACTIVE_TO_STANDBY role
         assertFalse(haGroupStoreManager.isMutationBlocked(haGroupName));
+
+        // Set the HAGroupStoreManager instance back to null via reflection to 
force recreation for other tests
+        field.set(null, null);
     }
 
     @Test
@@ -248,29 +343,43 @@ public class HAGroupStoreManagerIT extends BaseTest {
         assertTrue(updatedRecordOpt.isPresent());
         HAGroupStoreRecord updatedRecord = updatedRecordOpt.get();
         assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC, 
updatedRecord.getHAGroupState());
+        assertNotNull(updatedRecord.getLastSyncStateTimeInMs());
+
+        // Set the HA group status to store and forward again and verify
+        // that getLastSyncStateTimeInMs is same (ACTIVE_NOT_IN_SYNC)
+        // The time should only update when we move to AIS to ANIS
+        haGroupStoreManager.setHAGroupStatusToStoreAndForward(haGroupName);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        Optional<HAGroupStoreRecord> updatedRecordOpt2 = 
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+        assertTrue(updatedRecordOpt2.isPresent());
+        HAGroupStoreRecord updatedRecord2 = updatedRecordOpt.get();
+        assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC, 
updatedRecord2.getHAGroupState());
+        assertEquals(updatedRecord.getLastSyncStateTimeInMs(), 
updatedRecord2.getLastSyncStateTimeInMs());
     }
 
     @Test
-    public void testSetHAGroupStatusRecordToSync() throws Exception {
+    public void testSetHAGroupStatusToSync() throws Exception {
         String haGroupName = testName.getMethodName();
         HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(config);
 
-        // Create an initial HAGroupStoreRecord with ACTIVE_NOT_IN_SYNC status
-        HAGroupStoreRecord initialRecord = new HAGroupStoreRecord(
-                "1.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
-
-        haAdmin.createHAGroupStoreRecordInZooKeeper(initialRecord);
-        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        // Initial record should be present in ACTIVE_NOT_IN_SYNC status
+        HAGroupStoreRecord initialRecord = 
haGroupStoreManager.getHAGroupStoreRecord(haGroupName).orElse(null);
+        assertNotNull(initialRecord);
+        assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC, 
initialRecord.getHAGroupState());
+        assertNotNull(initialRecord.getLastSyncStateTimeInMs());
 
-        // Set the HA group status to sync (ACTIVE)
-        haGroupStoreManager.setHAGroupStatusRecordToSync(haGroupName);
+        // Set the HA group status to sync (ACTIVE), we need to wait for 
ZK_SESSION_TIMEOUT * Multiplier
+        Thread.sleep((long) Math.ceil(config.getLong(ZK_SESSION_TIMEOUT, 
DEFAULT_ZK_SESSION_TIMEOUT)
+                * ZK_SESSION_TIMEOUT_MULTIPLIER));
+        haGroupStoreManager.setHAGroupStatusToSync(haGroupName);
         Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
 
-        // Verify the status was updated to ACTIVE
+        // Verify the state was updated to ACTIVE_IN_SYNC
         Optional<HAGroupStoreRecord> updatedRecordOpt = 
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
         assertTrue(updatedRecordOpt.isPresent());
         HAGroupStoreRecord updatedRecord = updatedRecordOpt.get();
-        assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE, 
updatedRecord.getClusterRole());
+        assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, 
updatedRecord.getHAGroupState());
+        assertNull(updatedRecord.getLastSyncStateTimeInMs());
     }
 
     @Test
@@ -350,4 +459,130 @@ public class HAGroupStoreManagerIT extends BaseTest {
 
     }
 
+    @Test
+    public void testSetReaderToDegraded() throws Exception {
+        String haGroupName = testName.getMethodName();
+        HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(config);
+
+        // Update the auto-created record to STANDBY state for testing
+        HAGroupStoreRecord standbyRecord = new HAGroupStoreRecord(
+                "1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY);
+
+        // Get the record to initialize ZNode from HAGroup so that we can 
artificially update it via HAAdmin
+        Optional<HAGroupStoreRecord> currentRecord = 
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+        assertTrue(currentRecord.isPresent());
+
+        // Update via HAAdmin
+        haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
standbyRecord, 0);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Call setReaderToDegraded
+        haGroupStoreManager.setReaderToDegraded(haGroupName);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Verify the status was updated to DEGRADED_STANDBY_FOR_READER
+        Optional<HAGroupStoreRecord> updatedRecordOpt = 
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+        assertTrue(updatedRecordOpt.isPresent());
+        HAGroupStoreRecord updatedRecord = updatedRecordOpt.get();
+        
assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER, 
updatedRecord.getHAGroupState());
+
+        // Test transition from DEGRADED_STANDBY_FOR_WRITER to DEGRADED_STANDBY
+        HAGroupStoreRecord degradedWriterRecord = new HAGroupStoreRecord(
+                "1.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER);
+
+        haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
degradedWriterRecord, 2);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Call setReaderToDegraded again
+        haGroupStoreManager.setReaderToDegraded(haGroupName);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Verify the status was updated to DEGRADED_STANDBY
+        updatedRecordOpt = 
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+        assertTrue(updatedRecordOpt.isPresent());
+        updatedRecord = updatedRecordOpt.get();
+        assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, 
updatedRecord.getHAGroupState());
+    }
+
+    @Test
+    public void testSetReaderToHealthy() throws Exception {
+        String haGroupName = testName.getMethodName();
+        HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(config);
+
+        // Get the record to initialize ZNode from HAGroup so that we can 
artificially update it via HAAdmin
+        Optional<HAGroupStoreRecord> currentRecord = 
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+        assertTrue(currentRecord.isPresent());
+
+        // Update the auto-created record to DEGRADED_STANDBY_FOR_READER state 
for testing
+        HAGroupStoreRecord degradedReaderRecord = new HAGroupStoreRecord(
+                "1.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER);
+
+        haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
degradedReaderRecord, 0);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Call setReaderToHealthy
+        haGroupStoreManager.setReaderToHealthy(haGroupName);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Verify the status was updated to STANDBY
+        Optional<HAGroupStoreRecord> updatedRecordOpt = 
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+        assertTrue(updatedRecordOpt.isPresent());
+        HAGroupStoreRecord updatedRecord = updatedRecordOpt.get();
+        assertEquals(HAGroupStoreRecord.HAGroupState.STANDBY, 
updatedRecord.getHAGroupState());
+
+        // Test transition from DEGRADED_STANDBY to DEGRADED_STANDBY_FOR_WRITER
+        HAGroupStoreRecord degradedRecord = new HAGroupStoreRecord(
+                "1.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY);
+
+        haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, 
degradedRecord, 2);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Call setReaderToHealthy again
+        haGroupStoreManager.setReaderToHealthy(haGroupName);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Verify the status was updated to DEGRADED_STANDBY_FOR_WRITER
+        updatedRecordOpt = 
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+        assertTrue(updatedRecordOpt.isPresent());
+        updatedRecord = updatedRecordOpt.get();
+        
assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER, 
updatedRecord.getHAGroupState());
+    }
+
+    @Test
+    public void testReaderStateTransitionInvalidStates() throws Exception {
+        String haGroupName = testName.getMethodName();
+        HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(config);
+
+        // Get the record to initialize ZNode from HAGroup so that we can 
artificially update it via HAAdmin
+        Optional<HAGroupStoreRecord> currentRecord = 
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+        assertTrue(currentRecord.isPresent());
+
+        // Update the auto-created record to ACTIVE_IN_SYNC state (invalid for 
both operations)
+        HAGroupStoreRecord activeRecord = new HAGroupStoreRecord(
+                "1.0", haGroupName, 
HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
+
+        haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, activeRecord, 
0);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        // Test setReaderToDegraded with invalid state
+        try {
+            haGroupStoreManager.setReaderToDegraded(haGroupName);
+            fail("Expected InvalidClusterRoleTransitionException for 
setReaderToDegraded with ACTIVE_IN_SYNC state");
+        } catch (InvalidClusterRoleTransitionException e) {
+            // Expected behavior
+            assertTrue("Exception should mention the invalid transition",
+                      e.getMessage().contains("ACTIVE_IN_SYNC") && 
e.getMessage().contains("DEGRADED_STANDBY_FOR_READER"));
+        }
+
+        // Test setReaderToHealthy with invalid state
+        try {
+            haGroupStoreManager.setReaderToHealthy(haGroupName);
+            fail("Expected InvalidClusterRoleTransitionException for 
setReaderToHealthy with ACTIVE_IN_SYNC state");
+        } catch (InvalidClusterRoleTransitionException e) {
+            // Expected behavior
+            assertTrue("Exception should mention the invalid transition",
+                      e.getMessage().contains("ACTIVE_IN_SYNC") && 
e.getMessage().contains("STANDBY"));
+        }
+    }
+
 }
\ No newline at end of file
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java
index 5e2685d03a..044a8a4925 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java
@@ -41,7 +41,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static 
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
 import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -71,8 +71,8 @@ public class PhoenixHAAdminIT extends BaseTest {
 
     @Before
     public void before() throws Exception {
-        haAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(), 
ZK_CONSISTENT_HA_NAMESPACE);
-        peerHaAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(), 
ZK_CONSISTENT_HA_NAMESPACE);
+        haAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(), 
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
+        peerHaAdmin = new 
PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(), 
ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
         cleanupTestZnodes();
     }
 
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java
index d15e73aca2..abc4d02a50 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java
@@ -110,4 +110,19 @@ public class HAGroupStoreTestUtil {
             conn.commit();
         }
     }
-} 
\ No newline at end of file
+
+    /**
+     * Deletes all HA group records from the system table for testing purposes.
+     *
+     * @param zkUrl the ZooKeeper URL to connect to
+     * @throws SQLException if the database operation fails
+     */
+    public static void deleteAllHAGroupRecordsInSystemTable(String zkUrl) 
throws SQLException {
+        // Delete all records from System Table
+        try (PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + zkUrl);
+             Statement stmt = conn.createStatement()) {
+            stmt.execute("DELETE FROM " + SYSTEM_HA_GROUP_NAME);
+            conn.commit();
+        }
+    }
+}
\ No newline at end of file

Reply via email to