ritegarg commented on code in PR #2244:
URL: https://github.com/apache/phoenix/pull/2244#discussion_r2261174054


##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java:
##########
@@ -17,191 +17,640 @@
  */
 package org.apache.phoenix.jdbc;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.exception.InvalidClusterRoleTransitionException;
+import org.apache.phoenix.exception.StaleHAGroupStoreRecordVersionException;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.RegistryType;
+import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.util.JDBCUtil;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_1;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_2;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_1;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_2;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.HA_GROUP_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.POLICY;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VERSION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ZK_URL_1;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ZK_URL_2;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static 
org.apache.phoenix.query.QueryServices.HA_STORE_AND_FORWARD_MODE_REFRESH_INTERVAL_MS;
+import static 
org.apache.phoenix.query.QueryServices.HA_SYNC_MODE_REFRESH_INTERVAL_MS;
+import static org.apache.phoenix.query.QueryServicesOptions
+        .DEFAULT_HA_STORE_AND_FORWARD_MODE_REFRESH_INTERVAL_MS;
+import static org.apache.phoenix.query.QueryServicesOptions
+        .DEFAULT_HA_SYNC_MODE_REFRESH_INTERVAL_MS;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK;
 
 
 /**
- * Write-through cache for HAGroupStore.
+ * Main implementation of HAGroupStoreClient with peer support.
+ * Write-through cache for HAGroupStore based on {@link HAGroupStoreRecord}.
  * Uses {@link PathChildrenCache} from {@link 
org.apache.curator.framework.CuratorFramework}.
  */
 public class HAGroupStoreClient implements Closeable {
 
+    public static final String ZK_CONSISTENT_HA_NAMESPACE =
+            "phoenix" + ZKPaths.PATH_SEPARATOR + "consistentHA";
     private static final long HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS 
= 30000L;
-    private static volatile HAGroupStoreClient haGroupStoreClientInstance;
+    private static final String CACHE_TYPE_LOCAL = "LOCAL";
+    private static final String CACHE_TYPE_PEER = "PEER";
     private PhoenixHAAdmin phoenixHaAdmin;
+    private PhoenixHAAdmin peerPhoenixHaAdmin;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HAGroupStoreClient.class);
-    // Map contains <ClusterRole, Map<HAGroupName(String), ClusterRoleRecord>>
-    private final ConcurrentHashMap<ClusterRoleRecord.ClusterRole, 
ConcurrentHashMap<String, ClusterRoleRecord>> clusterRoleToCRRMap
-            = new ConcurrentHashMap<>();
-    private PathChildrenCache pathChildrenCache;
-    private volatile boolean isHealthy;
+    // Map of <ZKUrl, <HAGroupName, HAGroupStoreClientInstance>>
+    private static final Map<String, ConcurrentHashMap<String, 
HAGroupStoreClient>> instances =
+            new ConcurrentHashMap<>();
+    // HAGroupName for this instance
+    private final String haGroupName;
+    // PathChildrenCache for current cluster and HAGroupName
+    private PathChildrenCache pathChildrenCache = null;
+    // PathChildrenCache for peer cluster and HAGroupName
+    private PathChildrenCache peerPathChildrenCache = null;
+    // Whether the client is healthy
+    private volatile boolean isHealthy = false;
+    // Configuration
+    private final Configuration conf;
+    // ZK URL for the current cluster and HAGroupName
+    private String zkUrl;
+    // Peer ZK URL for peer cluster and HAGroupName
+    private String peerZKUrl = null;
+    // Peer Custom Event Listener
+    private final PathChildrenCacheListener 
peerCustomPathChildrenCacheListener;
+    // Wait time for sync mode
+    private final long waitTimeForSyncModeInMs;
+    // Wait time for store and forward mode
+    private final long waitTimeForStoreAndForwardModeInMs;
+    // Policy for the HA group
+    private HighAvailabilityPolicy policy;
+    private ClusterRole clusterRole;
+    private ClusterRole peerClusterRole;
+    private String clusterUrl;
+    private String peerClusterUrl;
+    private long clusterRoleRecordVersion;
+
 
     /**
      * Creates/gets an instance of HAGroupStoreClient.
      * Can return null instance if unable to initialize.
      *
      * @param conf configuration
+     * @param haGroupName name of the HA group. Only specified HA group is 
tracked.
+     * @param zkUrl zkUrl to use for the client. Prefer providing this 
parameter to avoid
+     *             the overhead of getting the local zkUrl from the 
configuration.
      * @return HAGroupStoreClient instance
      */
-    public static HAGroupStoreClient getInstance(Configuration conf) {
-        if (haGroupStoreClientInstance == null || 
!haGroupStoreClientInstance.isHealthy) {
+    public static HAGroupStoreClient getInstance(Configuration conf, String 
haGroupName,
+            String zkUrl) throws SQLException {
+        Preconditions.checkNotNull(haGroupName, "haGroupName cannot be null");
+        String localZkUrl = Objects.toString(zkUrl, getLocalZkUrl(conf));
+        Preconditions.checkNotNull(localZkUrl, "zkUrl cannot be null");
+        HAGroupStoreClient result = instances.getOrDefault(localZkUrl, new 
ConcurrentHashMap<>())
+                .getOrDefault(haGroupName, null);
+        if (result == null || !result.isHealthy) {
             synchronized (HAGroupStoreClient.class) {
-                if (haGroupStoreClientInstance == null || 
!haGroupStoreClientInstance.isHealthy) {
-                    haGroupStoreClientInstance = new HAGroupStoreClient(conf, 
null);
-                    if (!haGroupStoreClientInstance.isHealthy) {
-                        haGroupStoreClientInstance.close();
-                        haGroupStoreClientInstance = null;
+                result = instances.getOrDefault(localZkUrl, new 
ConcurrentHashMap<>())
+                        .getOrDefault(haGroupName, null);
+                if (result == null || !result.isHealthy) {
+                    result = new HAGroupStoreClient(conf, null, null, 
haGroupName, zkUrl);
+                    if (!result.isHealthy) {
+                        result.close();
+                        result = null;
+                    } else {
+                        instances.putIfAbsent(localZkUrl, new 
ConcurrentHashMap<>());
+                        instances.get(localZkUrl).put(haGroupName, result);
                     }
                 }
             }
         }
-        return haGroupStoreClientInstance;
+        return result;
+    }
+
+    /**
+     * Get the list of HAGroupNames from system table.
+     * We can also get the list of HAGroupNames from the system table by 
providing the zkUrl in
+     * where clause but we need to match the formatted zkUrl with the zkUrl in 
the system table so
+     * that matching is done correctly.
+     *
+     * @param zkUrl for connecting to Table
+     * @return the list of HAGroupNames
+     * @throws SQLException in case of unexpected error
+     */
+    public static List<String> getHAGroupNames(String zkUrl) throws 
SQLException {
+        List<String> result = new ArrayList<>();
+        String queryString = String.format("SELECT %s,%s,%s FROM %s", 
HA_GROUP_NAME, ZK_URL_1,
+                ZK_URL_2, SYSTEM_HA_GROUP_NAME);
+        try (PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(
+                JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + zkUrl);
+             Statement stmt = conn.createStatement();
+             ResultSet rs = stmt.executeQuery(queryString)) {
+            while (rs.next()) {
+                String zkUrl1 = rs.getString(ZK_URL_1);
+                String zkUrl2 = rs.getString(ZK_URL_2);
+                String formattedZkUrl1 = JDBCUtil.formatUrl(zkUrl1, 
RegistryType.ZK);
+                String formattedZkUrl2 = JDBCUtil.formatUrl(zkUrl2, 
RegistryType.ZK);
+                String formattedZkUrl = JDBCUtil.formatUrl(zkUrl, 
RegistryType.ZK);
+                if (StringUtils.equals(formattedZkUrl1, formattedZkUrl) ||
+                        StringUtils.equals(formattedZkUrl2, formattedZkUrl)) {
+                    result.add(rs.getString(HA_GROUP_NAME));
+                }
+            }
+        }
+        return result;
     }
 
     @VisibleForTesting
-    HAGroupStoreClient(final Configuration conf, final 
PathChildrenCacheListener pathChildrenCacheListener) {
+    HAGroupStoreClient(final Configuration conf,
+            final PathChildrenCacheListener pathChildrenCacheListener,
+            final PathChildrenCacheListener peerPathChildrenCacheListener,
+            final String haGroupName,
+            final String zkUrl) {
+        this.conf = conf;
+        this.haGroupName = haGroupName;
+        this.zkUrl = zkUrl;
+        this.waitTimeForSyncModeInMs = 
conf.getLong(HA_SYNC_MODE_REFRESH_INTERVAL_MS,
+                DEFAULT_HA_SYNC_MODE_REFRESH_INTERVAL_MS);
+        this.waitTimeForStoreAndForwardModeInMs = conf.getLong(
+                HA_STORE_AND_FORWARD_MODE_REFRESH_INTERVAL_MS,
+                DEFAULT_HA_STORE_AND_FORWARD_MODE_REFRESH_INTERVAL_MS);
+        // Custom Event Listener
+        this.peerCustomPathChildrenCacheListener = 
peerPathChildrenCacheListener;
         try {
-            this.phoenixHaAdmin = new PhoenixHAAdmin(conf);
-            final PathChildrenCache pathChildrenCache;
-                pathChildrenCache = new 
PathChildrenCache(phoenixHaAdmin.getCurator(), ZKPaths.PATH_SEPARATOR, true);
-            final CountDownLatch latch = new CountDownLatch(1);
-            if (pathChildrenCacheListener != null) {
-                
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
-            } else {
-                pathChildrenCache.getListenable().addListener((client, event) 
-> {
-                    LOGGER.info("HAGroupStoreClient PathChildrenCache Received 
event for type {}", event.getType());
-                    final ChildData childData = event.getData();
-                    ClusterRoleRecord eventCRR = extractCRROrNull(childData);
-                    switch (event.getType()) {
-                        case CHILD_ADDED:
-                        case CHILD_UPDATED:
-                            if (eventCRR != null && eventCRR.getHaGroupName() 
!= null) {
-                                updateClusterRoleRecordMap(eventCRR);
-                            }
-                            break;
-                        case CHILD_REMOVED:
-                            // In case of CHILD_REMOVED, we get the old 
version of data that was just deleted in event.
-                            if (eventCRR != null && eventCRR.getHaGroupName() 
!= null
-                                    && !eventCRR.getHaGroupName().isEmpty()
-                                    && 
eventCRR.getRole(phoenixHaAdmin.getZkUrl()) != null) {
-                                LOGGER.info("Received CHILD_REMOVED event, 
Removing CRR {} from existing CRR Map {}", eventCRR, clusterRoleToCRRMap);
-                                final ClusterRoleRecord.ClusterRole role = 
eventCRR.getRole(phoenixHaAdmin.getZkUrl());
-                                clusterRoleToCRRMap.putIfAbsent(role, new 
ConcurrentHashMap<>());
-                                
clusterRoleToCRRMap.get(role).remove(eventCRR.getHaGroupName());
-                            }
-                            break;
-                        case INITIALIZED:
-                            latch.countDown();
-                            break;
-                        case CONNECTION_LOST:
-                        case CONNECTION_SUSPENDED:
-                            isHealthy = false;
-                            break;
-                        case CONNECTION_RECONNECTED:
-                            isHealthy = true;
-                            break;
-                        default:
-                            LOGGER.warn("Unexpected event type {}, complete 
event {}", event.getType(), event);
-                    }
-                });
+            // Initialize HAGroupStoreClient attributes
+            initializeHAGroupStoreClientAttributes(haGroupName);
+            // Initialize Phoenix HA Admin
+            this.phoenixHaAdmin = new PhoenixHAAdmin(this.zkUrl, conf, 
ZK_CONSISTENT_HA_NAMESPACE);
+            // Initialize local cache
+            this.pathChildrenCache = 
initializePathChildrenCache(phoenixHaAdmin,
+                    pathChildrenCacheListener, CACHE_TYPE_LOCAL);
+            // Initialize ZNode if not present in ZK
+            initializeZNodeIfNeeded();
+            if (this.pathChildrenCache != null) {
+                this.isHealthy = true;
+                // Initialize peer cache
+                maybeInitializePeerPathChildrenCache();
             }
-            
pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
-            this.pathChildrenCache = pathChildrenCache;
-            isHealthy = 
latch.await(HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
-            buildClusterRoleToCRRMap();
+
         } catch (Exception e) {
-            isHealthy = false;
-            LOGGER.error("Unexpected error occurred while initializing 
HAGroupStoreClient, marking cache as unhealthy", e);
+            this.isHealthy = false;
+            close();
+            LOGGER.error("Unexpected error occurred while initializing 
HAGroupStoreClient, "
+                    + "marking cache as unhealthy", e);
         }
     }
 
-    private ClusterRoleRecord extractCRROrNull(final ChildData childData) {
-        if (childData != null) {
-            byte[] data = childData.getData();
-            return ClusterRoleRecord.fromJson(data).orElse(null);
+    /**
+     * Rebuilds the internal cache by querying for all needed data.
+     * This is a blocking operation that does not generate events to listeners.
+     * @param broadcastUpdate whether to broadcast the update to all 
regionservers
+     *
+     * @throws Exception if rebuild fails or client is not healthy
+     */
+    public void rebuild(boolean broadcastUpdate) throws Exception {
+        if (!isHealthy) {
+            throw new IOException("HAGroupStoreClient is not healthy");
         }
-        return null;
+        LOGGER.info("Rebuilding HAGroupStoreClient for HA group {} with 
broadcastUpdate {}",
+                haGroupName, broadcastUpdate);
+        if (broadcastUpdate) {
+            // TODO: Add a lock for the row in system table.
+            // TODO: broadcast update all regionservers' HAGroupStoreClient 
Attributes
+            // with broadcastUpdate = false
+        }
+        initializeHAGroupStoreClientAttributes(haGroupName);
+        initializeZNodeIfNeeded();
+        maybeInitializePeerPathChildrenCache();
+
+        // NOTE: this is a BLOCKING method.
+        // Completely rebuild the internal cache by querying for all needed 
data
+        // WITHOUT generating any events to send to listeners.
+        if (pathChildrenCache != null) {
+            pathChildrenCache.rebuild();
+        }
+        if (peerPathChildrenCache != null) {
+            peerPathChildrenCache.rebuild();
+        }
+        // TODO: Remove the lock for the row in system table if acquired.
+        LOGGER.info("Rebuild Complete for HAGroupStoreClient for HA group {}", 
haGroupName);
     }
 
-    private void updateClusterRoleRecordMap(final ClusterRoleRecord crr) {
-        if (crr != null && crr.getHaGroupName() != null && 
crr.getRole(phoenixHaAdmin.getZkUrl()) != null) {
-            ClusterRoleRecord.ClusterRole role = 
crr.getRole(phoenixHaAdmin.getZkUrl());
-            LOGGER.info("Updating Existing CRR Map {} with new CRR {}", 
clusterRoleToCRRMap, crr);
-            clusterRoleToCRRMap.putIfAbsent(role, new ConcurrentHashMap<>());
-            clusterRoleToCRRMap.get(role).put(crr.getHaGroupName(), crr);
-            LOGGER.info("Added new CRR {} to CRR Map", crr);
-            // Remove any pre-existing mapping with any other role for this 
HAGroupName
-            for (ClusterRoleRecord.ClusterRole mapRole : 
clusterRoleToCRRMap.keySet()) {
-                if (mapRole != role) {
-                    ConcurrentHashMap<String, ClusterRoleRecord> roleWiseMap = 
clusterRoleToCRRMap.get(mapRole);
-                    if (roleWiseMap.containsKey(crr.getHaGroupName())) {
-                        LOGGER.info("Removing any pre-existing mapping with 
role {} for HAGroupName {}", mapRole, crr.getHaGroupName());
-                        roleWiseMap.remove(crr.getHaGroupName());
-                    }
-                }
-            }
-            LOGGER.info("Final Updated CRR Map {}", clusterRoleToCRRMap);
+    /**
+     * Get HAGroupStoreRecord from local quorum.
+     *
+     * @return HAGroupStoreRecord for the specified HA group name, or null if 
not found
+     * @throws IOException if the client is not healthy
+     */
+    public HAGroupStoreRecord getHAGroupStoreRecord() throws IOException {
+        if (!isHealthy) {
+            throw new IOException("HAGroupStoreClient is not healthy");
         }
+        return fetchCacheRecord(this.pathChildrenCache, 
CACHE_TYPE_LOCAL).getLeft();
     }
 
-    private void buildClusterRoleToCRRMap() {
-        List<ClusterRoleRecord> clusterRoleRecords = 
pathChildrenCache.getCurrentData().stream().map(this::extractCRROrNull)
-                .filter(Objects::nonNull).collect(Collectors.toList());
-        for (ClusterRoleRecord crr : clusterRoleRecords) {
-            updateClusterRoleRecordMap(crr);
+    /**
+     * Set the HA group status for the specified HA group name.
+     * Checks if the status is needed to be updated based on logic in 
isUpdateNeeded function.
+     *
+     * @param haGroupState the HA group state to set
+     * @throws IOException if the client is not healthy or the operation fails
+     * @throws StaleHAGroupStoreRecordVersionException if the version is stale
+     * @throws InvalidClusterRoleTransitionException when transition is not 
valid
+     */
+    public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState 
haGroupState)
+            throws IOException, StaleHAGroupStoreRecordVersionException,
+            InvalidClusterRoleTransitionException {
+        Preconditions.checkNotNull(haGroupState, "haGroupState cannot be 
null");
+        if (!isHealthy) {
+            throw new IOException("HAGroupStoreClient is not healthy");
+        }
+        Pair<HAGroupStoreRecord, Stat> cacheRecord = fetchCacheRecord(
+                this.pathChildrenCache, CACHE_TYPE_LOCAL);
+        HAGroupStoreRecord currentHAGroupStoreRecord = cacheRecord.getLeft();
+        Stat currentHAGroupStoreRecordStat = cacheRecord.getRight();
+        if (currentHAGroupStoreRecord == null) {
+            throw new IOException("Current HAGroupStoreRecordStat in cache is 
null, "
+                    + "cannot update HAGroupStoreRecord, the record should be 
initialized "
+                    + "in System Table first" + haGroupName);
         }
+        if (isUpdateNeeded(currentHAGroupStoreRecord.getHAGroupState(),
+                currentHAGroupStoreRecordStat.getMtime(), haGroupState)) {
+                HAGroupStoreRecord newHAGroupStoreRecord = new 
HAGroupStoreRecord(
+                        currentHAGroupStoreRecord.getProtocolVersion(),
+                        currentHAGroupStoreRecord.getHaGroupName(),
+                        haGroupState
+                );
+                // TODO: Check if cluster role is changing, if so, we need to 
update
+                // the system table first
+                // Lock the row in System Table and make sure update is 
reflected
+                // in all regionservers
+                // It should automatically update the ZK record as well.
+                phoenixHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
+                        newHAGroupStoreRecord, 
currentHAGroupStoreRecordStat.getVersion());
+        } else {
+            LOGGER.info("Not updating HAGroupStoreRecord for HA group {} with 
state {}",
+                    haGroupName, haGroupState);
+        }
+    }
+
+    /**
+     * Returns the ClusterRoleRecord for the cluster pair.
+     * Information in System Table for peer cluster might be stale,
+     * so we need to get the latest information from ZK.
+     *
+     * @return ClusterRoleRecord for the cluster pair
+     * @throws IOException
+     */
+    public ClusterRoleRecord getClusterRoleRecord() throws IOException {
+        HAGroupStoreRecord peerHAGroupStoreRecord = 
getHAGroupStoreRecordFromPeer();
+        ClusterRoleRecord.ClusterRole peerClusterRole = peerHAGroupStoreRecord 
!= null
+                ? peerHAGroupStoreRecord.getClusterRole()
+                : ClusterRole.UNKNOWN;
+        return new ClusterRoleRecord(this.haGroupName,
+                                    this.policy,
+                                    this.clusterUrl,
+                                    this.clusterRole,
+                                    this.peerClusterUrl,
+                                    peerClusterRole,
+                                    this.clusterRoleRecordVersion);
     }
 
-    public void rebuild() throws Exception {
+    /**
+     * Get HAGroupStoreRecord from peer cluster.
+     *
+     * @return HAGroupStoreRecord for the specified HA group name, or null if 
not found
+     * @throws IOException if the client is not healthy
+     */
+    private HAGroupStoreRecord getHAGroupStoreRecordFromPeer() throws 
IOException {
         if (!isHealthy) {
             throw new IOException("HAGroupStoreClient is not healthy");
         }
-        LOGGER.info("Rebuilding HAGroupStoreClient for HA groups");
-        // NOTE: this is a BLOCKING method.
-        // Completely rebuild the internal cache by querying for all needed 
data
-        // WITHOUT generating any events to send to listeners.
-        pathChildrenCache.rebuild();
-        buildClusterRoleToCRRMap();
-        LOGGER.info("Rebuild Complete for HAGroupStoreClient");
+        return fetchCacheRecord(this.peerPathChildrenCache, 
CACHE_TYPE_PEER).getLeft();
+    }
+
+    private void initializeZNodeIfNeeded() throws IOException,
+            StaleHAGroupStoreRecordVersionException {
+        // Sometimes the ZNode might not be available in local cache yet, so 
better to check
+        // in ZK directly if we need to initialize
+        Pair<HAGroupStoreRecord, Stat> cacheRecordFromZK =
+                
phoenixHaAdmin.getHAGroupStoreRecordInZooKeeper(this.haGroupName);
+        HAGroupStoreRecord haGroupStoreRecord = cacheRecordFromZK.getLeft();
+        HAGroupStoreRecord newHAGroupStoreRecord = new HAGroupStoreRecord(
+            HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
+            haGroupName,
+            this.clusterRole.getDefaultHAGroupState()
+        );
+        // Only update current ZNode if it doesn't have the same role as 
present in System Table.
+        // If not exists, then create ZNode
+        if (haGroupStoreRecord != null &&
+                !haGroupStoreRecord.getClusterRole().equals(this.clusterRole)) 
{
+            phoenixHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
+                    newHAGroupStoreRecord, 
cacheRecordFromZK.getRight().getVersion());
+        } else if (haGroupStoreRecord == null) {
+            
phoenixHaAdmin.createHAGroupStoreRecordInZooKeeper(newHAGroupStoreRecord);
+        }
+    }
+
+    private void initializeHAGroupStoreClientAttributes(String haGroupName) 
throws SQLException {
+        String queryString = String.format("SELECT * FROM %s WHERE %s = '%s'",
+                SYSTEM_HA_GROUP_NAME, HA_GROUP_NAME, haGroupName);
+        try (PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(
+                JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + zkUrl);
+             Statement stmt = conn.createStatement();
+             ResultSet rs = stmt.executeQuery(queryString)) {
+            if (rs.next()) {
+                this.policy = 
HighAvailabilityPolicy.valueOf(rs.getString(POLICY));
+                String zkUrl1 = rs.getString(ZK_URL_1);
+                String zkUrl2 = rs.getString(ZK_URL_2);
+                String clusterRole1 = rs.getString(CLUSTER_ROLE_1);
+                String clusterRole2 = rs.getString(CLUSTER_ROLE_2);
+                String clusterUrl1 = rs.getString(CLUSTER_URL_1);
+                String clusterUrl2 = rs.getString(CLUSTER_URL_2);
+                this.clusterRoleRecordVersion = rs.getLong(VERSION);
+                Preconditions.checkNotNull(zkUrl1, "ZK_URL_1 in System Table 
cannot be null");
+                Preconditions.checkNotNull(zkUrl2, "ZK_URL_2 in System Table 
cannot be null");
+                String formattedZkUrl1 = JDBCUtil.formatUrl(zkUrl1, 
RegistryType.ZK);
+                String formattedZkUrl2 = JDBCUtil.formatUrl(zkUrl2, 
RegistryType.ZK);
+                String formattedZkUrl = JDBCUtil.formatUrl(this.zkUrl, 
RegistryType.ZK);
+                if (StringUtils.equals(formattedZkUrl1, formattedZkUrl)) {
+                    this.peerZKUrl = formattedZkUrl2;
+                    this.clusterRole = ClusterRoleRecord.ClusterRole.from(
+                            clusterRole1.getBytes(StandardCharsets.UTF_8));
+                    this.peerClusterRole = ClusterRoleRecord.ClusterRole.from(
+                            clusterRole2.getBytes(StandardCharsets.UTF_8));
+                    this.clusterUrl = clusterUrl1;
+                    this.peerClusterUrl = clusterUrl2;
+                } else if (StringUtils.equals(formattedZkUrl2, 
formattedZkUrl)) {
+                    this.peerZKUrl = JDBCUtil.formatUrl(zkUrl1, 
RegistryType.ZK);
+                    this.clusterRole = ClusterRoleRecord.ClusterRole.from(
+                            clusterRole2.getBytes(StandardCharsets.UTF_8));
+                    this.peerClusterRole = ClusterRoleRecord.ClusterRole.from(
+                            clusterRole1.getBytes(StandardCharsets.UTF_8));
+                    this.clusterUrl = clusterUrl2;
+                    this.peerClusterUrl = clusterUrl1;
+                }
+            } else {
+                throw new SQLException("HAGroupStoreRecord not found for HA 
group name: " +
+                        haGroupName + " in System Table " + 
SYSTEM_HA_GROUP_NAME);
+            }
+        }
+        Preconditions.checkNotNull(this.clusterRole,
+                "Cluster role in System Table cannot be null");
+        Preconditions.checkNotNull(this.peerClusterRole,
+                "Peer cluster role in System Table cannot be null");
+        Preconditions.checkNotNull(this.clusterUrl,
+                "Cluster URL in System Table cannot be null");
+        Preconditions.checkNotNull(this.peerZKUrl,
+                "Peer ZK URL in System Table cannot be null");
+        Preconditions.checkNotNull(this.peerClusterUrl,
+                "Peer Cluster URL in System Table cannot be null");
+        Preconditions.checkNotNull(this.clusterRoleRecordVersion,
+                "Cluster role record version in System Table cannot be null");
+    }
+
+    private void maybeInitializePeerPathChildrenCache() {
+        if (StringUtils.isNotBlank(this.peerZKUrl)) {
+            try {
+                // Setup peer connection if needed (first time or ZK Url 
changed)
+                if (peerPathChildrenCache == null
+                    || (peerPhoenixHaAdmin != null &&
+                            !StringUtils.equals(this.peerZKUrl, 
peerPhoenixHaAdmin.getZkUrl()))) {
+                    // Clean up existing peer connection if it exists
+                    closePeerConnection();
+                    // Setup new peer connection
+                    this.peerPhoenixHaAdmin
+                            = new PhoenixHAAdmin(this.peerZKUrl, conf, 
ZK_CONSISTENT_HA_NAMESPACE);
+                    // Create new PeerPathChildrenCache
+                    this.peerPathChildrenCache = 
initializePathChildrenCache(peerPhoenixHaAdmin,
+                            this.peerCustomPathChildrenCacheListener, 
CACHE_TYPE_PEER);
+                }
+            } catch (Exception e) {
+                closePeerConnection();
+                LOGGER.error("Unable to initialize PeerPathChildrenCache for 
HAGroupStoreClient",
+                        e);
+                // Don't think we should mark HAGroupStoreClient as unhealthy 
if
+                // peerCache is unhealthy, if needed we can introduce a config 
to control behavior.
+            }
+        } else {
+            // Close Peer Cache for this HAGroupName if currentClusterRecord 
is null
+            // or peerZKUrl is blank
+            closePeerConnection();
+            LOGGER.error("Not initializing PeerPathChildrenCache for 
HAGroupStoreClient "
+                    + "with HAGroupName {} as peerZKUrl is blank", 
haGroupName);
+        }
+    }
+
+    private PathChildrenCache initializePathChildrenCache(PhoenixHAAdmin admin,
+            PathChildrenCacheListener customListener, String cacheType) {
+        LOGGER.info("Initializing {} PathChildrenCache with URL {}", 
cacheType, admin.getZkUrl());
+        PathChildrenCache newPathChildrenCache = null;
+        try {
+            newPathChildrenCache = new PathChildrenCache(admin.getCurator(),
+                    ZKPaths.PATH_SEPARATOR, true);
+            final CountDownLatch latch = new CountDownLatch(1);
+            newPathChildrenCache.getListenable().addListener(customListener != 
null
+                    ? customListener
+                    : createCacheListener(latch, cacheType));
+            
newPathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+            boolean initialized = 
latch.await(HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS,
+                    TimeUnit.MILLISECONDS);
+            return initialized ? newPathChildrenCache : null;
+        } catch (Exception e) {
+            if (newPathChildrenCache != null) {
+                try {
+                    newPathChildrenCache.close();
+                } catch (IOException ioe) {
+                    LOGGER.error("Failed to close {} PathChildrenCache with 
ZKUrl",
+                            cacheType, ioe);
+                }
+            }
+            LOGGER.error("Failed to initialize {} PathChildrenCache", 
cacheType, e);
+            return null;
+        }
+    }
+
+    private PathChildrenCacheListener createCacheListener(CountDownLatch 
latch, String cacheType) {
+        return (client, event) -> {
+            final ChildData childData = event.getData();
+            HAGroupStoreRecord eventRecord = 
extractHAGroupStoreRecordOrNull(childData);
+            LOGGER.info("HAGroupStoreClient Cache {} received event {} type {} 
at {}",
+                    cacheType, eventRecord, event.getType(), 
System.currentTimeMillis());
+            switch (event.getType()) {
+                // TODO: Add support for event watcher for HAGroupStoreRecord
+                // case CHILD_ADDED:
+                // case CHILD_UPDATED:
+                // case CHILD_REMOVED:
+                case INITIALIZED:
+                    latch.countDown();
+                    break;
+                case CONNECTION_LOST:
+                case CONNECTION_SUSPENDED:
+                    if (CACHE_TYPE_LOCAL.equals(cacheType)) {
+                        isHealthy = false;
+                    }
+                    LOGGER.warn("{} HAGroupStoreClient cache connection 
lost/suspended",
+                            cacheType);
+                    break;
+                case CONNECTION_RECONNECTED:
+                    if (CACHE_TYPE_LOCAL.equals(cacheType)) {
+                        isHealthy = true;
+                    }
+                    LOGGER.info("{} HAGroupStoreClient cache connection 
reconnected", cacheType);
+                    break;
+                default:
+                    LOGGER.warn("Unexpected {} event type {}, complete event 
{}",
+                            cacheType, event.getType(), event);
+            }
+        };
+    }
+
+
+    private Pair<HAGroupStoreRecord, Stat> fetchCacheRecord(PathChildrenCache 
cache,
+            String cacheType) {
+        if (cache == null) {
+            LOGGER.warn("{} HAGroupStoreClient cache is null, returning null", 
cacheType);
+            return Pair.of(null, null);
+        }
+
+        String targetPath = toPath(this.haGroupName);
+        // Try to get record from current cache data
+        Pair<HAGroupStoreRecord, Stat> result = extractRecordAndStat(cache, 
targetPath, cacheType);
+        if (result.getLeft() != null) {
+            return result;
+        }
+
+        if (cacheType.equals(CACHE_TYPE_PEER)) {
+            return Pair.of(null, null);
+        }
+        // If no record found, try to rebuild and fetch again
+        LOGGER.info("No record found at path {} for {} cluster, trying to 
initialize ZNode "
+                + "from System Table in case it might have been deleted",
+                    targetPath, cacheType);
+        try {
+            rebuild(false);
+            return extractRecordAndStat(cache, targetPath, cacheType);
+        } catch (Exception e) {
+            LOGGER.error("Failed to initialize ZNode from System Table, giving 
up "
+                    + "and returning null", e);
+            return Pair.of(null, null);
+        }
+    }
+
+    private Pair<HAGroupStoreRecord, Stat> 
extractRecordAndStat(PathChildrenCache cache,
+            String targetPath, String cacheType) {
+        ChildData childData = cache.getCurrentData(targetPath);
+        if (childData != null) {
+            HAGroupStoreRecord record = 
extractHAGroupStoreRecordOrNull(childData);
+            Stat currentStat = childData.getStat();
+            LOGGER.info("Built {} cluster record: {}", cacheType, record);
+            return Pair.of(record, currentStat);
+        }
+        return Pair.of(null, null);
+    }
+
+    private HAGroupStoreRecord extractHAGroupStoreRecordOrNull(final ChildData 
childData) {
+        if (childData != null) {
+            byte[] data = childData.getData();
+            return HAGroupStoreRecord.fromJson(data).orElse(null);
+        }
+        return null;
     }
 
 
+    /**
+     * Closes the peer connection and cleans up peer-related resources.
+     */
+    private void closePeerConnection() {
+        try {
+            if (peerPathChildrenCache != null) {
+                peerPathChildrenCache.close();
+                peerPathChildrenCache = null;
+            }
+            if (peerPhoenixHaAdmin != null) {
+                peerPhoenixHaAdmin.close();
+                peerPhoenixHaAdmin = null;
+            }
+        } catch (Exception e) {
+            LOGGER.warn("Failed to close peer connection", e);
+        }
+    }
+
     @Override
     public void close() {
         try {
             LOGGER.info("Closing HAGroupStoreClient");
-            clusterRoleToCRRMap.clear();
             if (pathChildrenCache != null) {
                 pathChildrenCache.close();
+                pathChildrenCache = null;
             }
+            closePeerConnection();
             LOGGER.info("Closed HAGroupStoreClient");
         } catch (IOException e) {
             LOGGER.error("Exception closing HAGroupStoreClient", e);
         }
     }
 
-    public List<ClusterRoleRecord> 
getCRRsByClusterRole(ClusterRoleRecord.ClusterRole clusterRole) throws 
IOException {
-        if (!isHealthy) {
-            throw new IOException("HAGroupStoreClient is not healthy");
+    /**
+     * Checks if the HAGroupStoreRecord needs to be updated.
+     * If the cluster role is allowed to transition to the new state and the 
status refresh
+     * interval has expired, the HAGroupStoreRecord needs to be updated.
+     * If the transition is not allowed, an exception is thrown.
+     *
+     * @param currentHAGroupState the current HAGroupState of the 
HAGroupStoreRecord
+     * @param currentHAGroupStoreRecordMtime the last modified time of the 
current
+     *                                      HAGroupStoreRecord
+     * @param newHAGroupState the cluster state to check
+     * @return true if the HAGroupStoreRecord needs to be updated, false 
otherwise
+     * @throws InvalidClusterRoleTransitionException if the cluster role 
transition is invalid
+     */
+    private boolean isUpdateNeeded(HAGroupStoreRecord.HAGroupState 
currentHAGroupState,
+                                   long currentHAGroupStoreRecordMtime,
+                                   HAGroupStoreRecord.HAGroupState 
newHAGroupState)
+            throws InvalidClusterRoleTransitionException {
+        long waitTime = 0L;
+        if (currentHAGroupState.isTransitionAllowed(newHAGroupState)) {
+            if (currentHAGroupState == HAGroupState.ACTIVE_NOT_IN_SYNC
+                    && newHAGroupState == HAGroupState.ACTIVE_IN_SYNC) {
+                waitTime = waitTimeForSyncModeInMs;
+            } else if (currentHAGroupState == HAGroupState.ACTIVE_NOT_IN_SYNC
+            && newHAGroupState == HAGroupState.ACTIVE_NOT_IN_SYNC) {
+                //This is to avoid extra requests to ZK for updates.
+                waitTime = waitTimeForStoreAndForwardModeInMs;
+            }
+        } else {
+            throw new InvalidClusterRoleTransitionException("Cannot transition 
from "
+                    + currentHAGroupState + " to " + newHAGroupState);
         }
-        return 
ImmutableList.copyOf(clusterRoleToCRRMap.getOrDefault(clusterRole, new 
ConcurrentHashMap<>()).values());
+        return ((System.currentTimeMillis() - currentHAGroupStoreRecordMtime) 
> waitTime);

Review Comment:
   I added it to reduce unnecessary updates to ZK. But I think it is not 
required. I have removed the config and its usage



##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java:
##########
@@ -17,191 +17,640 @@
  */
 package org.apache.phoenix.jdbc;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.exception.InvalidClusterRoleTransitionException;
+import org.apache.phoenix.exception.StaleHAGroupStoreRecordVersionException;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.RegistryType;
+import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.util.JDBCUtil;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_1;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_ROLE_2;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_1;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLUSTER_URL_2;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.HA_GROUP_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.POLICY;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VERSION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ZK_URL_1;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ZK_URL_2;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static 
org.apache.phoenix.query.QueryServices.HA_STORE_AND_FORWARD_MODE_REFRESH_INTERVAL_MS;
+import static 
org.apache.phoenix.query.QueryServices.HA_SYNC_MODE_REFRESH_INTERVAL_MS;
+import static org.apache.phoenix.query.QueryServicesOptions
+        .DEFAULT_HA_STORE_AND_FORWARD_MODE_REFRESH_INTERVAL_MS;
+import static org.apache.phoenix.query.QueryServicesOptions
+        .DEFAULT_HA_SYNC_MODE_REFRESH_INTERVAL_MS;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK;
 
 
 /**
- * Write-through cache for HAGroupStore.
+ * Main implementation of HAGroupStoreClient with peer support.
+ * Write-through cache for HAGroupStore based on {@link HAGroupStoreRecord}.
  * Uses {@link PathChildrenCache} from {@link 
org.apache.curator.framework.CuratorFramework}.
  */
 public class HAGroupStoreClient implements Closeable {
 
+    public static final String ZK_CONSISTENT_HA_NAMESPACE =
+            "phoenix" + ZKPaths.PATH_SEPARATOR + "consistentHA";
     private static final long HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS 
= 30000L;
-    private static volatile HAGroupStoreClient haGroupStoreClientInstance;
+    private static final String CACHE_TYPE_LOCAL = "LOCAL";
+    private static final String CACHE_TYPE_PEER = "PEER";
     private PhoenixHAAdmin phoenixHaAdmin;
+    private PhoenixHAAdmin peerPhoenixHaAdmin;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HAGroupStoreClient.class);
-    // Map contains <ClusterRole, Map<HAGroupName(String), ClusterRoleRecord>>
-    private final ConcurrentHashMap<ClusterRoleRecord.ClusterRole, 
ConcurrentHashMap<String, ClusterRoleRecord>> clusterRoleToCRRMap
-            = new ConcurrentHashMap<>();
-    private PathChildrenCache pathChildrenCache;
-    private volatile boolean isHealthy;
+    // Map of <ZKUrl, <HAGroupName, HAGroupStoreClientInstance>>
+    private static final Map<String, ConcurrentHashMap<String, 
HAGroupStoreClient>> instances =
+            new ConcurrentHashMap<>();
+    // HAGroupName for this instance
+    private final String haGroupName;
+    // PathChildrenCache for current cluster and HAGroupName
+    private PathChildrenCache pathChildrenCache = null;
+    // PathChildrenCache for peer cluster and HAGroupName
+    private PathChildrenCache peerPathChildrenCache = null;
+    // Whether the client is healthy
+    private volatile boolean isHealthy = false;
+    // Configuration
+    private final Configuration conf;
+    // ZK URL for the current cluster and HAGroupName
+    private String zkUrl;
+    // Peer ZK URL for peer cluster and HAGroupName
+    private String peerZKUrl = null;
+    // Peer Custom Event Listener
+    private final PathChildrenCacheListener 
peerCustomPathChildrenCacheListener;
+    // Wait time for sync mode
+    private final long waitTimeForSyncModeInMs;
+    // Wait time for store and forward mode
+    private final long waitTimeForStoreAndForwardModeInMs;
+    // Policy for the HA group
+    private HighAvailabilityPolicy policy;
+    private ClusterRole clusterRole;
+    private ClusterRole peerClusterRole;
+    private String clusterUrl;
+    private String peerClusterUrl;
+    private long clusterRoleRecordVersion;
+
 
     /**
      * Creates/gets an instance of HAGroupStoreClient.
      * Can return null instance if unable to initialize.
      *
      * @param conf configuration
+     * @param haGroupName name of the HA group. Only specified HA group is 
tracked.
+     * @param zkUrl zkUrl to use for the client. Prefer providing this 
parameter to avoid
+     *             the overhead of getting the local zkUrl from the 
configuration.
      * @return HAGroupStoreClient instance
      */
-    public static HAGroupStoreClient getInstance(Configuration conf) {
-        if (haGroupStoreClientInstance == null || 
!haGroupStoreClientInstance.isHealthy) {
+    public static HAGroupStoreClient getInstance(Configuration conf, String 
haGroupName,

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to