kadirozde commented on code in PR #2244:
URL: https://github.com/apache/phoenix/pull/2244#discussion_r2245083988


##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java:
##########
@@ -17,191 +17,640 @@
  */
 package org.apache.phoenix.jdbc;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.exception.InvalidClusterRoleTransitionException;
+import org.apache.phoenix.exception.StaleHAGroupStoreRecordVersionException;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.RegistryType;
+import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.util.JDBCUtil;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_1;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_2;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_1;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_2;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.HA_GROUP_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.POLICY;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VERSION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ZK_URL_1;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ZK_URL_2;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static 
org.apache.phoenix.query.QueryServices.HA_STORE_AND_FORWARD_MODE_REFRESH_INTERVAL_MS;
+import static 
org.apache.phoenix.query.QueryServices.HA_SYNC_MODE_REFRESH_INTERVAL_MS;
+import static org.apache.phoenix.query.QueryServicesOptions
+        .DEFAULT_HA_STORE_AND_FORWARD_MODE_REFRESH_INTERVAL_MS;
+import static org.apache.phoenix.query.QueryServicesOptions
+        .DEFAULT_HA_SYNC_MODE_REFRESH_INTERVAL_MS;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK;
 
 
 /**
- * Write-through cache for HAGroupStore.
+ * Main implementation of HAGroupStoreClient with peer support.
+ * Write-through cache for HAGroupStore based on {@link HAGroupStoreRecord}.
  * Uses {@link PathChildrenCache} from {@link 
org.apache.curator.framework.CuratorFramework}.
  */
 public class HAGroupStoreClient implements Closeable {
 
+    public static final String ZK_CONSISTENT_HA_NAMESPACE =
+            "phoenix" + ZKPaths.PATH_SEPARATOR + "consistentHA";
     private static final long HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS 
= 30000L;
-    private static volatile HAGroupStoreClient haGroupStoreClientInstance;
+    private static final String CACHE_TYPE_LOCAL = "LOCAL";
+    private static final String CACHE_TYPE_PEER = "PEER";
     private PhoenixHAAdmin phoenixHaAdmin;
+    private PhoenixHAAdmin peerPhoenixHaAdmin;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HAGroupStoreClient.class);
-    // Map contains <ClusterRole, Map<HAGroupName(String), ClusterRoleRecord>>
-    private final ConcurrentHashMap<ClusterRoleRecord.ClusterRole, 
ConcurrentHashMap<String, ClusterRoleRecord>> clusterRoleToCRRMap
-            = new ConcurrentHashMap<>();
-    private PathChildrenCache pathChildrenCache;
-    private volatile boolean isHealthy;
+    // Map of <ZKUrl, <HAGroupName, HAGroupStoreClientInstance>>
+    private static final Map<String, ConcurrentHashMap<String, 
HAGroupStoreClient>> instances =
+            new ConcurrentHashMap<>();
+    // HAGroupName for this instance
+    private final String haGroupName;
+    // PathChildrenCache for current cluster and HAGroupName
+    private PathChildrenCache pathChildrenCache = null;
+    // PathChildrenCache for peer cluster and HAGroupName
+    private PathChildrenCache peerPathChildrenCache = null;
+    // Whether the client is healthy
+    private volatile boolean isHealthy = false;
+    // Configuration
+    private final Configuration conf;
+    // ZK URL for the current cluster and HAGroupName
+    private String zkUrl;
+    // Peer ZK URL for peer cluster and HAGroupName
+    private String peerZKUrl = null;
+    // Peer Custom Event Listener
+    private final PathChildrenCacheListener 
peerCustomPathChildrenCacheListener;
+    // Wait time for sync mode
+    private final long waitTimeForSyncModeInMs;
+    // Wait time for store and forward mode
+    private final long waitTimeForStoreAndForwardModeInMs;
+    // Policy for the HA group
+    private HighAvailabilityPolicy policy;
+    private ClusterRole clusterRole;
+    private ClusterRole peerClusterRole;
+    private String clusterUrl;
+    private String peerClusterUrl;
+    private long clusterRoleRecordVersion;
+
 
     /**
      * Creates/gets an instance of HAGroupStoreClient.
      * Can return null instance if unable to initialize.
      *
      * @param conf configuration
+     * @param haGroupName name of the HA group. Only specified HA group is 
tracked.
+     * @param zkUrl zkUrl to use for the client. Prefer providing this 
parameter to avoid
+     *             the overhead of getting the local zkUrl from the 
configuration.
      * @return HAGroupStoreClient instance
      */
-    public static HAGroupStoreClient getInstance(Configuration conf) {
-        if (haGroupStoreClientInstance == null || 
!haGroupStoreClientInstance.isHealthy) {
+    public static HAGroupStoreClient getInstance(Configuration conf, String 
haGroupName,

Review Comment:
   Can we have two methods, getInstance and getInstanceForLocalZK? 



##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java:
##########
@@ -17,191 +17,640 @@
  */
 package org.apache.phoenix.jdbc;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.exception.InvalidClusterRoleTransitionException;
+import org.apache.phoenix.exception.StaleHAGroupStoreRecordVersionException;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.RegistryType;
+import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.util.JDBCUtil;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_1;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_2;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_1;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_2;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.HA_GROUP_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.POLICY;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VERSION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ZK_URL_1;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ZK_URL_2;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static 
org.apache.phoenix.query.QueryServices.HA_STORE_AND_FORWARD_MODE_REFRESH_INTERVAL_MS;
+import static 
org.apache.phoenix.query.QueryServices.HA_SYNC_MODE_REFRESH_INTERVAL_MS;
+import static org.apache.phoenix.query.QueryServicesOptions
+        .DEFAULT_HA_STORE_AND_FORWARD_MODE_REFRESH_INTERVAL_MS;
+import static org.apache.phoenix.query.QueryServicesOptions
+        .DEFAULT_HA_SYNC_MODE_REFRESH_INTERVAL_MS;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK;
 
 
 /**
- * Write-through cache for HAGroupStore.
+ * Main implementation of HAGroupStoreClient with peer support.
+ * Write-through cache for HAGroupStore based on {@link HAGroupStoreRecord}.
  * Uses {@link PathChildrenCache} from {@link 
org.apache.curator.framework.CuratorFramework}.
  */
 public class HAGroupStoreClient implements Closeable {
 
+    public static final String ZK_CONSISTENT_HA_NAMESPACE =
+            "phoenix" + ZKPaths.PATH_SEPARATOR + "consistentHA";
     private static final long HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS 
= 30000L;
-    private static volatile HAGroupStoreClient haGroupStoreClientInstance;
+    private static final String CACHE_TYPE_LOCAL = "LOCAL";
+    private static final String CACHE_TYPE_PEER = "PEER";
     private PhoenixHAAdmin phoenixHaAdmin;
+    private PhoenixHAAdmin peerPhoenixHaAdmin;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HAGroupStoreClient.class);
-    // Map contains <ClusterRole, Map<HAGroupName(String), ClusterRoleRecord>>
-    private final ConcurrentHashMap<ClusterRoleRecord.ClusterRole, 
ConcurrentHashMap<String, ClusterRoleRecord>> clusterRoleToCRRMap
-            = new ConcurrentHashMap<>();
-    private PathChildrenCache pathChildrenCache;
-    private volatile boolean isHealthy;
+    // Map of <ZKUrl, <HAGroupName, HAGroupStoreClientInstance>>
+    private static final Map<String, ConcurrentHashMap<String, 
HAGroupStoreClient>> instances =
+            new ConcurrentHashMap<>();
+    // HAGroupName for this instance
+    private final String haGroupName;
+    // PathChildrenCache for current cluster and HAGroupName
+    private PathChildrenCache pathChildrenCache = null;
+    // PathChildrenCache for peer cluster and HAGroupName
+    private PathChildrenCache peerPathChildrenCache = null;
+    // Whether the client is healthy
+    private volatile boolean isHealthy = false;
+    // Configuration
+    private final Configuration conf;
+    // ZK URL for the current cluster and HAGroupName
+    private String zkUrl;
+    // Peer ZK URL for peer cluster and HAGroupName
+    private String peerZKUrl = null;
+    // Peer Custom Event Listener
+    private final PathChildrenCacheListener 
peerCustomPathChildrenCacheListener;
+    // Wait time for sync mode
+    private final long waitTimeForSyncModeInMs;
+    // Wait time for store and forward mode
+    private final long waitTimeForStoreAndForwardModeInMs;
+    // Policy for the HA group
+    private HighAvailabilityPolicy policy;
+    private ClusterRole clusterRole;
+    private ClusterRole peerClusterRole;
+    private String clusterUrl;
+    private String peerClusterUrl;
+    private long clusterRoleRecordVersion;
+
 
     /**
      * Creates/gets an instance of HAGroupStoreClient.
      * Can return null instance if unable to initialize.
      *
      * @param conf configuration
+     * @param haGroupName name of the HA group. Only specified HA group is 
tracked.
+     * @param zkUrl zkUrl to use for the client. Prefer providing this 
parameter to avoid
+     *             the overhead of getting the local zkUrl from the 
configuration.
      * @return HAGroupStoreClient instance
      */
-    public static HAGroupStoreClient getInstance(Configuration conf) {
-        if (haGroupStoreClientInstance == null || 
!haGroupStoreClientInstance.isHealthy) {
+    public static HAGroupStoreClient getInstance(Configuration conf, String 
haGroupName,
+            String zkUrl) throws SQLException {
+        Preconditions.checkNotNull(haGroupName, "haGroupName cannot be null");
+        String localZkUrl = Objects.toString(zkUrl, getLocalZkUrl(conf));
+        Preconditions.checkNotNull(localZkUrl, "zkUrl cannot be null");
+        HAGroupStoreClient result = instances.getOrDefault(localZkUrl, new 
ConcurrentHashMap<>())
+                .getOrDefault(haGroupName, null);
+        if (result == null || !result.isHealthy) {
             synchronized (HAGroupStoreClient.class) {
-                if (haGroupStoreClientInstance == null || 
!haGroupStoreClientInstance.isHealthy) {
-                    haGroupStoreClientInstance = new HAGroupStoreClient(conf, 
null);
-                    if (!haGroupStoreClientInstance.isHealthy) {
-                        haGroupStoreClientInstance.close();
-                        haGroupStoreClientInstance = null;
+                result = instances.getOrDefault(localZkUrl, new 
ConcurrentHashMap<>())
+                        .getOrDefault(haGroupName, null);
+                if (result == null || !result.isHealthy) {
+                    result = new HAGroupStoreClient(conf, null, null, 
haGroupName, zkUrl);
+                    if (!result.isHealthy) {
+                        result.close();
+                        result = null;
+                    } else {
+                        instances.putIfAbsent(localZkUrl, new 
ConcurrentHashMap<>());
+                        instances.get(localZkUrl).put(haGroupName, result);
                     }
                 }
             }
         }
-        return haGroupStoreClientInstance;
+        return result;
+    }
+
+    /**
+     * Get the list of HAGroupNames from system table.
+     * We can also get the list of HAGroupNames from the system table by 
providing the zkUrl in
+     * where clause but we need to match the formatted zkUrl with the zkUrl in 
the system table so
+     * that matching is done correctly.
+     *
+     * @param zkUrl for connecting to Table
+     * @return the list of HAGroupNames
+     * @throws SQLException in case of unexpected error
+     */
+    public static List<String> getHAGroupNames(String zkUrl) throws 
SQLException {
+        List<String> result = new ArrayList<>();
+        String queryString = String.format("SELECT %s,%s,%s FROM %s", 
HA_GROUP_NAME, ZK_URL_1,
+                ZK_URL_2, SYSTEM_HA_GROUP_NAME);
+        try (PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(
+                JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + zkUrl);
+             Statement stmt = conn.createStatement();
+             ResultSet rs = stmt.executeQuery(queryString)) {
+            while (rs.next()) {
+                String zkUrl1 = rs.getString(ZK_URL_1);
+                String zkUrl2 = rs.getString(ZK_URL_2);
+                String formattedZkUrl1 = JDBCUtil.formatUrl(zkUrl1, 
RegistryType.ZK);
+                String formattedZkUrl2 = JDBCUtil.formatUrl(zkUrl2, 
RegistryType.ZK);
+                String formattedZkUrl = JDBCUtil.formatUrl(zkUrl, 
RegistryType.ZK);
+                if (StringUtils.equals(formattedZkUrl1, formattedZkUrl) ||
+                        StringUtils.equals(formattedZkUrl2, formattedZkUrl)) {
+                    result.add(rs.getString(HA_GROUP_NAME));
+                }
+            }
+        }
+        return result;
     }
 
     @VisibleForTesting
-    HAGroupStoreClient(final Configuration conf, final 
PathChildrenCacheListener pathChildrenCacheListener) {
+    HAGroupStoreClient(final Configuration conf,
+            final PathChildrenCacheListener pathChildrenCacheListener,
+            final PathChildrenCacheListener peerPathChildrenCacheListener,
+            final String haGroupName,
+            final String zkUrl) {
+        this.conf = conf;
+        this.haGroupName = haGroupName;
+        this.zkUrl = zkUrl;
+        this.waitTimeForSyncModeInMs = 
conf.getLong(HA_SYNC_MODE_REFRESH_INTERVAL_MS,
+                DEFAULT_HA_SYNC_MODE_REFRESH_INTERVAL_MS);
+        this.waitTimeForStoreAndForwardModeInMs = conf.getLong(
+                HA_STORE_AND_FORWARD_MODE_REFRESH_INTERVAL_MS,
+                DEFAULT_HA_STORE_AND_FORWARD_MODE_REFRESH_INTERVAL_MS);
+        // Custom Event Listener
+        this.peerCustomPathChildrenCacheListener = 
peerPathChildrenCacheListener;
         try {
-            this.phoenixHaAdmin = new PhoenixHAAdmin(conf);
-            final PathChildrenCache pathChildrenCache;
-                pathChildrenCache = new 
PathChildrenCache(phoenixHaAdmin.getCurator(), ZKPaths.PATH_SEPARATOR, true);
-            final CountDownLatch latch = new CountDownLatch(1);
-            if (pathChildrenCacheListener != null) {
-                
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
-            } else {
-                pathChildrenCache.getListenable().addListener((client, event) 
-> {
-                    LOGGER.info("HAGroupStoreClient PathChildrenCache Received 
event for type {}", event.getType());
-                    final ChildData childData = event.getData();
-                    ClusterRoleRecord eventCRR = extractCRROrNull(childData);
-                    switch (event.getType()) {
-                        case CHILD_ADDED:
-                        case CHILD_UPDATED:
-                            if (eventCRR != null && eventCRR.getHaGroupName() 
!= null) {
-                                updateClusterRoleRecordMap(eventCRR);
-                            }
-                            break;
-                        case CHILD_REMOVED:
-                            // In case of CHILD_REMOVED, we get the old 
version of data that was just deleted in event.
-                            if (eventCRR != null && eventCRR.getHaGroupName() 
!= null
-                                    && !eventCRR.getHaGroupName().isEmpty()
-                                    && 
eventCRR.getRole(phoenixHaAdmin.getZkUrl()) != null) {
-                                LOGGER.info("Received CHILD_REMOVED event, 
Removing CRR {} from existing CRR Map {}", eventCRR, clusterRoleToCRRMap);
-                                final ClusterRoleRecord.ClusterRole role = 
eventCRR.getRole(phoenixHaAdmin.getZkUrl());
-                                clusterRoleToCRRMap.putIfAbsent(role, new 
ConcurrentHashMap<>());
-                                
clusterRoleToCRRMap.get(role).remove(eventCRR.getHaGroupName());
-                            }
-                            break;
-                        case INITIALIZED:
-                            latch.countDown();
-                            break;
-                        case CONNECTION_LOST:
-                        case CONNECTION_SUSPENDED:
-                            isHealthy = false;
-                            break;
-                        case CONNECTION_RECONNECTED:
-                            isHealthy = true;
-                            break;
-                        default:
-                            LOGGER.warn("Unexpected event type {}, complete 
event {}", event.getType(), event);
-                    }
-                });
+            // Initialize HAGroupStoreClient attributes
+            initializeHAGroupStoreClientAttributes(haGroupName);
+            // Initialize Phoenix HA Admin
+            this.phoenixHaAdmin = new PhoenixHAAdmin(this.zkUrl, conf, 
ZK_CONSISTENT_HA_NAMESPACE);
+            // Initialize local cache
+            this.pathChildrenCache = 
initializePathChildrenCache(phoenixHaAdmin,
+                    pathChildrenCacheListener, CACHE_TYPE_LOCAL);
+            // Initialize ZNode if not present in ZK
+            initializeZNodeIfNeeded();
+            if (this.pathChildrenCache != null) {
+                this.isHealthy = true;
+                // Initialize peer cache
+                maybeInitializePeerPathChildrenCache();
             }
-            
pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
-            this.pathChildrenCache = pathChildrenCache;
-            isHealthy = 
latch.await(HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
-            buildClusterRoleToCRRMap();
+
         } catch (Exception e) {
-            isHealthy = false;
-            LOGGER.error("Unexpected error occurred while initializing 
HAGroupStoreClient, marking cache as unhealthy", e);
+            this.isHealthy = false;
+            close();
+            LOGGER.error("Unexpected error occurred while initializing 
HAGroupStoreClient, "
+                    + "marking cache as unhealthy", e);
         }
     }
 
-    private ClusterRoleRecord extractCRROrNull(final ChildData childData) {
-        if (childData != null) {
-            byte[] data = childData.getData();
-            return ClusterRoleRecord.fromJson(data).orElse(null);
+    /**
+     * Rebuilds the internal cache by querying for all needed data.
+     * This is a blocking operation that does not generate events to listeners.
+     * @param broadcastUpdate whether to broadcast the update to all 
regionservers
+     *
+     * @throws Exception if rebuild fails or client is not healthy
+     */
+    public void rebuild(boolean broadcastUpdate) throws Exception {
+        if (!isHealthy) {
+            throw new IOException("HAGroupStoreClient is not healthy");
         }
-        return null;
+        LOGGER.info("Rebuilding HAGroupStoreClient for HA group {} with 
broadcastUpdate {}",
+                haGroupName, broadcastUpdate);
+        if (broadcastUpdate) {
+            // TODO: Add a lock for the row in system table.

Review Comment:
   The lock should be taken on the region where the row is. So, it needs to be 
done in a coproc hook.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to