This is an automated email from the ASF dual-hosted git repository.

lokiore pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new e853980bc0 PHOENIX-7493 Graceful Failover with Phoenix HA  (#2075)
e853980bc0 is described below

commit e853980bc04ae44e6670fd60857360d67831e607
Author: ritegarg <58840065+riteg...@users.noreply.github.com>
AuthorDate: Fri Mar 28 09:31:22 2025 -0700

    PHOENIX-7493 Graceful Failover with Phoenix HA  (#2075)
---
 .../phoenix/cache/ServerMetadataCacheImpl.java     |   3 -
 .../org/apache/phoenix/jdbc/ClusterRoleRecord.java |   4 +-
 .../jdbc/ClusterRoleRecordGeneratorTool.java       |   2 +-
 .../apache/phoenix/jdbc/HAGroupStoreClient.java    | 207 +++++++++
 .../apache/phoenix/jdbc/HAGroupStoreManager.java   |  80 ++++
 .../apache/phoenix/jdbc/HighAvailabilityGroup.java |   2 -
 .../phoenix/jdbc/HighAvailabilityPolicy.java       |   2 +-
 .../org/apache/phoenix/jdbc/PhoenixHAAdmin.java    | 494 +++++++++++++++++++++
 .../apache/phoenix/jdbc/PhoenixHAAdminTool.java    | 466 +------------------
 .../org/apache/phoenix/query/QueryServices.java    |   2 +
 .../apache/phoenix/query/QueryServicesOptions.java |   6 +-
 .../protobuf/RegionServerEndpointService.proto     |   9 +
 .../coprocessor/PhoenixRegionServerEndpoint.java   |  20 +
 .../UngroupedAggregateRegionObserver.java          |  18 +
 .../phoenix/hbase/index/IndexRegionObserver.java   |  12 +
 phoenix-core/pom.xml                               |   4 +
 .../phoenix/cache/ServerMetadataCacheIT.java       |   2 -
 .../end2end/PhoenixRegionServerEndpointIT.java     |  13 +-
 .../apache/phoenix/jdbc/HAGroupStoreClientIT.java  | 427 ++++++++++++++++++
 .../apache/phoenix/jdbc/HAGroupStoreManagerIT.java |  88 ++++
 .../jdbc/HighAvailabilityTestingUtility.java       |  12 +-
 .../phoenix/jdbc/PhoenixHAAdminToolTest.java       |  12 +-
 22 files changed, 1409 insertions(+), 476 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java
index 5f9aa10455..91b783628b 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java
@@ -20,8 +20,6 @@ package org.apache.phoenix.cache;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -33,7 +31,6 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
 import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
 import org.apache.phoenix.thirdparty.com.google.common.cache.RemovalListener;
-import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
index aa28c6e85f..c2af18df8b 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java
@@ -59,13 +59,13 @@ public class ClusterRoleRecord {
      * take traffic, standby and offline do not, and unknown is used if the 
state cannot be determined.
      */
     public enum ClusterRole {
-        ACTIVE, STANDBY, OFFLINE, UNKNOWN;
+        ACTIVE, STANDBY, OFFLINE, UNKNOWN, ACTIVE_TO_STANDBY;
 
         /**
          * @return true if a cluster with this role can be connected, 
otherwise false
          */
         public boolean canConnect() {
-            return this == ACTIVE || this == STANDBY;
+            return this == ACTIVE || this == STANDBY || this == 
ACTIVE_TO_STANDBY;
         }
 
         public static ClusterRole from(byte[] bytes) {
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java
index 93899f87a2..03c29320d4 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java
@@ -40,7 +40,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ATTR_PREFIX;
-import static org.apache.phoenix.jdbc.PhoenixHAAdminTool.getLocalZkUrl;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
 
 
 /**
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
new file mode 100644
index 0000000000..05f4247ace
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+/**
+ * Write-through cache for HAGroupStore.
+ * Uses {@link PathChildrenCache} from {@link 
org.apache.curator.framework.CuratorFramework}.
+ */
+public class HAGroupStoreClient implements Closeable {
+
+    private static final long HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS 
= 30000L;
+    private static volatile HAGroupStoreClient haGroupStoreClientInstance;
+    private PhoenixHAAdmin phoenixHaAdmin;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HAGroupStoreClient.class);
+    // Map contains <ClusterRole, Map<HAGroupName(String), ClusterRoleRecord>>
+    private final ConcurrentHashMap<ClusterRoleRecord.ClusterRole, 
ConcurrentHashMap<String, ClusterRoleRecord>> clusterRoleToCRRMap
+            = new ConcurrentHashMap<>();
+    private PathChildrenCache pathChildrenCache;
+    private volatile boolean isHealthy;
+
+    /**
+     * Creates/gets an instance of HAGroupStoreClient.
+     * Can return null instance if unable to initialize.
+     *
+     * @param conf configuration
+     * @return HAGroupStoreClient instance
+     */
+    public static HAGroupStoreClient getInstance(Configuration conf) {
+        if (haGroupStoreClientInstance == null || 
!haGroupStoreClientInstance.isHealthy) {
+            synchronized (HAGroupStoreClient.class) {
+                if (haGroupStoreClientInstance == null || 
!haGroupStoreClientInstance.isHealthy) {
+                    haGroupStoreClientInstance = new HAGroupStoreClient(conf, 
null);
+                    if (!haGroupStoreClientInstance.isHealthy) {
+                        haGroupStoreClientInstance.close();
+                        haGroupStoreClientInstance = null;
+                    }
+                }
+            }
+        }
+        return haGroupStoreClientInstance;
+    }
+
+    @VisibleForTesting
+    HAGroupStoreClient(final Configuration conf, final 
PathChildrenCacheListener pathChildrenCacheListener) {
+        try {
+            this.phoenixHaAdmin = new PhoenixHAAdmin(conf);
+            final PathChildrenCache pathChildrenCache;
+                pathChildrenCache = new 
PathChildrenCache(phoenixHaAdmin.getCurator(), ZKPaths.PATH_SEPARATOR, true);
+            final CountDownLatch latch = new CountDownLatch(1);
+            if (pathChildrenCacheListener != null) {
+                
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
+            } else {
+                pathChildrenCache.getListenable().addListener((client, event) 
-> {
+                    LOGGER.info("HAGroupStoreClient PathChildrenCache Received 
event for type {}", event.getType());
+                    final ChildData childData = event.getData();
+                    ClusterRoleRecord eventCRR = extractCRROrNull(childData);
+                    switch (event.getType()) {
+                        case CHILD_ADDED:
+                        case CHILD_UPDATED:
+                            if (eventCRR != null && eventCRR.getHaGroupName() 
!= null) {
+                                updateClusterRoleRecordMap(eventCRR);
+                            }
+                            break;
+                        case CHILD_REMOVED:
+                            // In case of CHILD_REMOVED, we get the old 
version of data that was just deleted in event.
+                            if (eventCRR != null && eventCRR.getHaGroupName() 
!= null
+                                    && !eventCRR.getHaGroupName().isEmpty()
+                                    && 
eventCRR.getRole(phoenixHaAdmin.getZkUrl()) != null) {
+                                LOGGER.info("Received CHILD_REMOVED event, 
Removing CRR {} from existing CRR Map {}", eventCRR, clusterRoleToCRRMap);
+                                final ClusterRoleRecord.ClusterRole role = 
eventCRR.getRole(phoenixHaAdmin.getZkUrl());
+                                clusterRoleToCRRMap.putIfAbsent(role, new 
ConcurrentHashMap<>());
+                                
clusterRoleToCRRMap.get(role).remove(eventCRR.getHaGroupName());
+                            }
+                            break;
+                        case INITIALIZED:
+                            latch.countDown();
+                            break;
+                        case CONNECTION_LOST:
+                        case CONNECTION_SUSPENDED:
+                            isHealthy = false;
+                            break;
+                        case CONNECTION_RECONNECTED:
+                            isHealthy = true;
+                            break;
+                        default:
+                            LOGGER.warn("Unexpected event type {}, complete 
event {}", event.getType(), event);
+                    }
+                });
+            }
+            
pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+            this.pathChildrenCache = pathChildrenCache;
+            isHealthy = 
latch.await(HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+            buildClusterRoleToCRRMap();
+        } catch (Exception e) {
+            isHealthy = false;
+            LOGGER.error("Unexpected error occurred while initializing 
HAGroupStoreClient, marking cache as unhealthy", e);
+        }
+    }
+
+    private ClusterRoleRecord extractCRROrNull(final ChildData childData) {
+        if (childData != null) {
+            byte[] data = childData.getData();
+            return ClusterRoleRecord.fromJson(data).orElse(null);
+        }
+        return null;
+    }
+
+    private void updateClusterRoleRecordMap(final ClusterRoleRecord crr) {
+        if (crr != null && crr.getHaGroupName() != null && 
crr.getRole(phoenixHaAdmin.getZkUrl()) != null) {
+            ClusterRoleRecord.ClusterRole role = 
crr.getRole(phoenixHaAdmin.getZkUrl());
+            LOGGER.info("Updating Existing CRR Map {} with new CRR {}", 
clusterRoleToCRRMap, crr);
+            clusterRoleToCRRMap.putIfAbsent(role, new ConcurrentHashMap<>());
+            clusterRoleToCRRMap.get(role).put(crr.getHaGroupName(), crr);
+            LOGGER.info("Added new CRR {} to CRR Map", crr);
+            // Remove any pre-existing mapping with any other role for this 
HAGroupName
+            for (ClusterRoleRecord.ClusterRole mapRole : 
clusterRoleToCRRMap.keySet()) {
+                if (mapRole != role) {
+                    ConcurrentHashMap<String, ClusterRoleRecord> roleWiseMap = 
clusterRoleToCRRMap.get(mapRole);
+                    if (roleWiseMap.containsKey(crr.getHaGroupName())) {
+                        LOGGER.info("Removing any pre-existing mapping with 
role {} for HAGroupName {}", mapRole, crr.getHaGroupName());
+                        roleWiseMap.remove(crr.getHaGroupName());
+                    }
+                }
+            }
+            LOGGER.info("Final Updated CRR Map {}", clusterRoleToCRRMap);
+        }
+    }
+
+    private void buildClusterRoleToCRRMap() {
+        List<ClusterRoleRecord> clusterRoleRecords = 
pathChildrenCache.getCurrentData().stream().map(this::extractCRROrNull)
+                .filter(Objects::nonNull).collect(Collectors.toList());
+        for (ClusterRoleRecord crr : clusterRoleRecords) {
+            updateClusterRoleRecordMap(crr);
+        }
+    }
+
+    public void rebuild() throws Exception {
+        if (!isHealthy) {
+            throw new IOException("HAGroupStoreClient is not healthy");
+        }
+        LOGGER.info("Rebuilding HAGroupStoreClient for HA groups");
+        // NOTE: this is a BLOCKING method.
+        // Completely rebuild the internal cache by querying for all needed 
data
+        // WITHOUT generating any events to send to listeners.
+        pathChildrenCache.rebuild();
+        buildClusterRoleToCRRMap();
+        LOGGER.info("Rebuild Complete for HAGroupStoreClient");
+    }
+
+
+    @Override
+    public void close() {
+        try {
+            LOGGER.info("Closing HAGroupStoreClient");
+            clusterRoleToCRRMap.clear();
+            if (pathChildrenCache != null) {
+                pathChildrenCache.close();
+            }
+            LOGGER.info("Closed HAGroupStoreClient");
+        } catch (IOException e) {
+            LOGGER.error("Exception closing HAGroupStoreClient", e);
+        }
+    }
+
+    public List<ClusterRoleRecord> 
getCRRsByClusterRole(ClusterRoleRecord.ClusterRole clusterRole) throws 
IOException {
+        if (!isHealthy) {
+            throw new IOException("HAGroupStoreClient is not healthy");
+        }
+        return clusterRoleToCRRMap.getOrDefault(clusterRole, new 
ConcurrentHashMap<>()).values().stream().collect(ImmutableList.toImmutableList());
+    }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
new file mode 100644
index 0000000000..690953a7b0
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+import static 
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
+
+public class HAGroupStoreManager {
+    private static volatile HAGroupStoreManager haGroupStoreManagerInstance;
+    private final boolean mutationBlockEnabled;
+    private final Configuration conf;
+
+    /**
+     * Creates/gets an instance of HAGroupStoreManager.
+     *
+     * @param conf configuration
+     * @return HAGroupStoreManager instance
+     */
+    public static HAGroupStoreManager getInstance(Configuration conf) {
+        if (haGroupStoreManagerInstance == null) {
+            synchronized (HAGroupStoreManager.class) {
+                if (haGroupStoreManagerInstance == null) {
+                    haGroupStoreManagerInstance = new 
HAGroupStoreManager(conf);
+                }
+            }
+        }
+        return haGroupStoreManagerInstance;
+    }
+
+    private HAGroupStoreManager(final Configuration conf) {
+        this.mutationBlockEnabled = 
conf.getBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED,
+                DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED);
+        this.conf = conf;
+    }
+
+    /**
+     * Checks whether mutation is blocked or not.
+     * @throws IOException when HAGroupStoreClient is not healthy.
+     */
+    public boolean isMutationBlocked() throws IOException {
+        if (mutationBlockEnabled) {
+            HAGroupStoreClient haGroupStoreClient = 
HAGroupStoreClient.getInstance(conf);
+            if (haGroupStoreClient != null) {
+                return 
!haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+            }
+            throw new IOException("HAGroupStoreClient is not initialized");
+        }
+        return false;
+    }
+
+    /**
+     * Force rebuilds the HAGroupStoreClient
+     * @throws Exception
+     */
+    public void invalidateHAGroupStoreClient() throws Exception {
+        HAGroupStoreClient haGroupStoreClient = 
HAGroupStoreClient.getInstance(conf);
+        if (haGroupStoreClient != null) {
+            haGroupStoreClient.rebuild();
+        } else {
+            throw new IOException("HAGroupStoreClient is not initialized");
+        }
+    }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
index 9f27a0998d..938b60275b 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
@@ -49,9 +49,7 @@ import java.sql.Connection;
 import java.sql.Driver;
 import java.sql.DriverManager;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
index 7a82d47a93..ea026a6806 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
 /**
  * An HighAvailabilityGroup provides a JDBC connection from given connection 
string and properties.
  */
-enum HighAvailabilityPolicy {
+public enum HighAvailabilityPolicy {
     FAILOVER {
         @Override
         public Connection provide(HighAvailabilityGroup haGroup, Properties 
info,
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java
new file mode 100644
index 0000000000..c9019b6c1a
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.atomic.AtomicValue;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicValue;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.util.JDBCUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Helper class to update cluster role record for a ZK cluster.
+ * The ZK client this accessor has is confined to a single ZK cluster, but it 
can be used to operate
+ * multiple HA groups that are associated with this cluster.
+ * This is not thread-safe yet. Multiple threads can update CRRs at same time 
potentially causing inconsistency.
+ */
+public class PhoenixHAAdmin implements Closeable {
+
+    /**
+     * Wrapper class for static accessor
+     */
+    public static class HighAvailibilityCuratorProvider {
+
+        public static final HighAvailibilityCuratorProvider INSTANCE = new 
HighAvailibilityCuratorProvider();
+        /**
+         * Gets curator blocking if necessary to create it
+         */
+        public CuratorFramework getCurator(String zkUrl, Properties 
properties) throws IOException {
+            return HighAvailabilityGroup.getCurator(zkUrl, properties);
+        }
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PhoenixHAAdmin.class);
+
+    /** The fully qualified ZK URL for an HBase cluster in format 
host:port:/hbase */
+    private final String zkUrl;
+    /** Configuration of this command line tool. */
+    private final Configuration conf;
+    /** Client properties which has copies of configuration defining ZK 
timeouts / retries. */
+    private final Properties properties = new Properties();
+    /** Curator Provider **/
+    private final HighAvailibilityCuratorProvider
+            highAvailibilityCuratorProvider;
+
+    public PhoenixHAAdmin(Configuration conf) {
+        this(getLocalZkUrl(conf), conf, 
HighAvailibilityCuratorProvider.INSTANCE);
+    }
+
+    public PhoenixHAAdmin(String zkUrl, Configuration conf) {
+        this(zkUrl, conf, HighAvailibilityCuratorProvider.INSTANCE);
+    }
+
+    public PhoenixHAAdmin(String zkUrl, Configuration conf,
+            HighAvailibilityCuratorProvider highAvailibilityCuratorProvider) {
+        Preconditions.checkNotNull(zkUrl);
+        Preconditions.checkNotNull(conf);
+        Preconditions.checkNotNull(highAvailibilityCuratorProvider);
+        this.zkUrl = JDBCUtil.formatZookeeperUrl(zkUrl);
+        this.conf = conf;
+        conf.iterator().forEachRemaining(k -> 
properties.setProperty(k.getKey(), k.getValue()));
+        this.highAvailibilityCuratorProvider = highAvailibilityCuratorProvider;
+    }
+
+    /**
+     * Helper method to get local ZK fully qualified URL (host:port:/hbase) 
from configuration.
+     */
+    public static String getLocalZkUrl(Configuration conf) {
+        String localZkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
+        if (StringUtils.isEmpty(localZkQuorum)) {
+            String msg = "ZK quorum not found by looking up key " + 
HConstants.ZOOKEEPER_QUORUM;
+            LOG.error(msg);
+            throw new IllegalArgumentException(msg);
+        }
+
+        String portStr = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+        int port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
+        if (portStr != null) {
+            try {
+                port = Integer.parseInt(portStr);
+            } catch (NumberFormatException e) {
+                String msg = String.format("Unrecognized ZK port '%s' in ZK 
quorum '%s'",
+                        portStr, localZkQuorum);
+                LOG.error(msg, e);
+                throw new IllegalArgumentException(msg, e);
+            }
+        }
+
+        String localZkRoot = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+                HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
+        return String.format("%s:%d:%s", localZkQuorum, port, localZkRoot);
+    }
+
+    /**
+     * Gets curator from the cache if available otherwise calls into 
getCurator to make it.
+     */
+    public CuratorFramework getCurator() throws IOException {
+        return highAvailibilityCuratorProvider.getCurator(zkUrl, properties);
+    }
+
+
+    /**
+     * Check if current cluster is ACTIVE role for the given HA group.
+     * In case of Exception when it fails to read cluster role data from the 
current cluster, it
+     * will assume current cluster is not ACTIVE. Callers should be aware of 
"false positive"
+     * possibility especially due to connectivity issue between this tool and 
remote ZK cluster.
+     * @param haGroupName the HA group name; a cluster can be associated with 
multiple HA groups
+     * @return true if current cluster is ACTIVE role, otherwise false
+     */
+    boolean isCurrentActiveCluster(String haGroupName) {
+        try {
+            byte[] data = getCurator().getData().forPath(toPath(haGroupName));
+
+            Optional<ClusterRoleRecord> record = 
ClusterRoleRecord.fromJson(data);
+            return record.isPresent() && record.get()
+                    .getRole(zkUrl) == ClusterRoleRecord.ClusterRole.ACTIVE;
+        } catch (KeeperException.NoNodeException ne) {
+            LOG.info(
+                    "No role record found for HA group {} on '{}', assuming it 
is not active",
+                    haGroupName, zkUrl);
+            return false;
+        } catch (Exception e) {
+            LOG.warn("Got exception when reading record for {} on cluster {}",
+                    haGroupName, zkUrl, e);
+            return false;
+        }
+    }
+
+    /**
+     * This lists all cluster role records stored in the zookeeper nodes.
+     * This read-only operation and hence no side effect on the ZK cluster.
+     */
+    public List<ClusterRoleRecord> listAllClusterRoleRecordsOnZookeeper() 
throws IOException {
+        List<String> haGroupNames;
+        try {
+            haGroupNames = 
getCurator().getChildren().forPath(ZKPaths.PATH_SEPARATOR);
+        } catch (Exception e) {
+            String msg = String.format("Got exception when listing all HA 
groups in %s", zkUrl);
+            LOG.error(msg);
+            throw new IOException(msg, e);
+        }
+
+        List<ClusterRoleRecord> records = new ArrayList<>();
+        List<String> failedHaGroups = new ArrayList<>();
+        for (String haGroupName : haGroupNames) {
+            try {
+                byte[] data = 
getCurator().getData().forPath(ZKPaths.PATH_SEPARATOR + haGroupName);
+                Optional<ClusterRoleRecord> record = 
ClusterRoleRecord.fromJson(data);
+                if (record.isPresent()) {
+                    records.add(record.get());
+                } else { // fail to deserialize data from JSON
+                    failedHaGroups.add(haGroupName);
+                }
+            } catch (Exception e) {
+                LOG.warn("Got exception when reading data for HA group {}",
+                        haGroupName, e);
+                failedHaGroups.add(haGroupName);
+            }
+        }
+
+        if (!failedHaGroups.isEmpty()) {
+            String
+                    msg =
+                    String.format(
+                            "Found following HA groups: %s. Fail to read 
cluster " + "role records for following HA groups: %s",
+                            String.join(",", haGroupNames), String.join(",", 
failedHaGroups));
+            LOG.error(msg);
+            throw new IOException(msg);
+        }
+        return records;
+    }
+
+    /**
+     * Helper method to write the given cluster role records into the ZK 
clusters respectively.
+     *
+     * // TODO: add retry logics
+     *
+     * @param records The cluster role record list to save on ZK
+     * @param forceful if true, this method will ignore errors on other 
clusters; otherwise it will
+     *                 not update next cluster (in order) if there is any 
failure on current cluster
+     * @return a map of HA group name to list cluster's url for cluster role 
record failing to write
+     */
+    public Map<String, List<String>> 
syncClusterRoleRecords(List<ClusterRoleRecord> records,
+            boolean forceful) throws IOException {
+        Map<String, List<String>> failedHaGroups = new HashMap<>();
+        for (ClusterRoleRecord record : records) {
+            String haGroupName = record.getHaGroupName();
+            try (PhoenixHAAdmin admin1 = new PhoenixHAAdmin(record.getZk1(), 
conf, HighAvailibilityCuratorProvider.INSTANCE);
+                    PhoenixHAAdmin admin2 = new 
PhoenixHAAdmin(record.getZk2(), conf, 
HighAvailibilityCuratorProvider.INSTANCE)) {
+                // Update the cluster previously ACTIVE cluster first.
+                // It reduces the chances of split-brain between clients and 
clusters.
+                // If can not determine previous ACTIVE cluster, update new 
STANDBY cluster first.
+                final PairOfSameType<PhoenixHAAdmin> pair;
+                if (admin1.isCurrentActiveCluster(haGroupName)) {
+                    pair = new PairOfSameType<>(admin1, admin2);
+                } else if (admin2.isCurrentActiveCluster(haGroupName)) {
+                    pair = new PairOfSameType<>(admin2, admin1);
+                } else if (record.getRole(admin1.getZkUrl()) == 
ClusterRoleRecord.ClusterRole.STANDBY) {
+                    pair = new PairOfSameType<>(admin1, admin2);
+                } else {
+                    pair = new PairOfSameType<>(admin2, admin1);
+                }
+                try {
+                    pair.getFirst().createOrUpdateDataOnZookeeper(record);
+                } catch (IOException e) {
+                    LOG.error("Error to create or update data on Zookeeper, 
cluster={}, record={}",
+                            pair.getFirst(), record);
+                    failedHaGroups.computeIfAbsent(haGroupName, (k) -> new 
ArrayList<>())
+                            .add(pair.getFirst().getZkUrl());
+                    if (!forceful) {
+                        LOG.error("-forceful option is not enabled by command 
line options, "
+                                + "skip writing record {} to ZK clusters", 
record);
+                        // skip writing this record to second ZK cluster, so 
we should report that
+                        failedHaGroups.computeIfAbsent(haGroupName, (k) -> new 
ArrayList<>())
+                                .add(pair.getSecond().getZkUrl());
+                        continue; // do not update this record on second 
cluster
+                    }
+                }
+                try {
+                    pair.getSecond().createOrUpdateDataOnZookeeper(record);
+                } catch (IOException e) {
+                    LOG.error("Error to create or update data on Zookeeper, 
cluster={}, record={}",
+                            pair.getFirst(), record);
+                    failedHaGroups.computeIfAbsent(haGroupName, (k) -> new 
ArrayList<>())
+                            .add(pair.getSecond().getZkUrl());
+                }
+            }
+        }
+        return failedHaGroups;
+    }
+
+    /**
+     * Verify cluster role records stored in local ZK nodes, and repair with 
remote znodes for any
+     * inconsistency.
+     * @return a list of HA group names with inconsistent cluster role 
records, or empty list
+     */
+    List<String> verifyAndRepairWithRemoteZnode() throws Exception {
+        List<String> inconsistentHaGroups = new ArrayList<>();
+        for (ClusterRoleRecord record : 
listAllClusterRoleRecordsOnZookeeper()) {
+            // the remote znodes may be on different ZK clusters.
+            if (record.getRole(zkUrl) == 
ClusterRoleRecord.ClusterRole.UNKNOWN) {
+                LOG.warn("Unknown cluster role for cluster '{}' in record {}",
+                        zkUrl, record);
+                continue;
+            }
+            String remoteZkUrl = record.getZk1().equals(zkUrl) ? 
record.getZk2() : record.getZk1();
+            try (PhoenixHAAdmin remoteAdmin = new PhoenixHAAdmin(remoteZkUrl, 
conf,
+                    HighAvailibilityCuratorProvider.INSTANCE)) {
+                ClusterRoleRecord remoteRecord;
+                try {
+                    String zPath = toPath(record.getHaGroupName());
+                    byte[] data = 
remoteAdmin.getCurator().getData().forPath(zPath);
+                    Optional<ClusterRoleRecord> recordOptional = 
ClusterRoleRecord.fromJson(data);
+                    if (!recordOptional.isPresent()) {
+                        remoteAdmin.createOrUpdateDataOnZookeeper(record);
+                        continue;
+                    }
+                    remoteRecord = recordOptional.get();
+                } catch (KeeperException.NoNodeException ne) {
+                    LOG.warn(
+                            "No record znode yet, creating for HA group {} on 
{}",
+                            record.getHaGroupName(), remoteAdmin);
+                    remoteAdmin.createDataOnZookeeper(record);
+                    LOG.info("Created znode on cluster {} with record {}",
+                            remoteAdmin, record);
+                    continue;
+                } catch (Exception e) {
+                    LOG.error(
+                            "Error to get data on remote cluster {} for HA 
group {}", remoteAdmin,
+                            record.getHaGroupName(), e);
+                    continue;
+                }
+
+                if 
(!record.getHaGroupName().equals(remoteRecord.getHaGroupName())) {
+                    inconsistentHaGroups.add(record.getHaGroupName());
+                    LOG.error(
+                            "INTERNAL ERROR: got cluster role record for 
different HA groups." + " Local record: {}, remote record: {}",
+                            record, remoteRecord);
+                } else if (remoteRecord.isNewerThan(record)) {
+                    createOrUpdateDataOnZookeeper(remoteRecord);
+                } else if (record.isNewerThan(remoteRecord)) {
+                    remoteAdmin.createOrUpdateDataOnZookeeper(record);
+                } else if (record.equals(remoteRecord)) {
+                    LOG.info("Cluster role record {} is consistent", record);
+                } else {
+                    inconsistentHaGroups.add(record.getHaGroupName());
+                    LOG.error(
+                            "Cluster role record for HA group {} is 
inconsistent. On cluster " + "{} the record is {}; on cluster {} the record is 
{}",
+                            record.getHaGroupName(), this, record, 
remoteAdmin, remoteRecord);
+                }
+            }
+        }
+        return inconsistentHaGroups;
+    }
+
+    /**
+     * This updates the cluster role data on the zookeeper it connects to.
+     * To avoid conflicts, it does CAS (compare-and-set) when updating.  The 
constraint is that the
+     * given record's version should be larger the existing record's version.  
This is a way to help
+     * avoiding manual update conflicts.  If the given record can not meet 
version check, it will
+     * reject the update request and client (human operator or external 
system) should retry.
+     * @param record the new cluster role record to be saved on ZK
+     * @return true if the data on ZK is updated otherwise false
+     * @throws IOException if it fails to update the cluster role data on ZK
+     */
+    public boolean createOrUpdateDataOnZookeeper(ClusterRoleRecord record) 
throws IOException {
+        if (!zkUrl.equals(record.getZk1()) && !zkUrl.equals(record.getZk2())) {
+            String
+                    msg =
+                    String.format(
+                            "INTERNAL ERROR: " + "ZK cluster is not associated 
with cluster role record! " + "ZK cluster URL: '%s'. Cluster role record: %s",
+                            zkUrl, record);
+            LOG.error(msg);
+            throw new IOException(msg);
+        }
+
+        String haGroupName = record.getHaGroupName();
+        byte[] data;
+        try {
+            data = getCurator().getData().forPath(toPath(haGroupName)); // Get 
initial data
+        } catch (KeeperException.NoNodeException ne) {
+            LOG.info("No record znode yet, creating for HA group {} on {}",
+                    haGroupName, zkUrl);
+            createDataOnZookeeper(record);
+            LOG.info("Created znode for HA group {} with record data {} on {}",
+                    haGroupName, record, zkUrl);
+            return true;
+        } catch (Exception e) {
+            String
+                    msg =
+                    String.format(
+                            "Fail to read cluster role record data for HA 
group %s " + "on cluster '%s'",
+                            haGroupName, zkUrl);
+            LOG.error(msg, e);
+            throw new IOException(msg, e);
+        }
+
+        Optional<ClusterRoleRecord> existingRecordOptional = 
ClusterRoleRecord.fromJson(data);
+        if (!existingRecordOptional.isPresent()) {
+            String
+                    msg =
+                    String.format(
+                            "Fail to parse existing cluster role record data 
for HA " + "group %s",
+                            haGroupName);
+            LOG.error(msg);
+            throw new IOException(msg);
+        }
+
+        ClusterRoleRecord existingRecord = existingRecordOptional.get();
+        if (record.getVersion() < existingRecord.getVersion()) {
+            String
+                    msg =
+                    String.format(
+                            "Invalid new cluster role record for HA group '%s' 
" + "because new record's version V%d is smaller than existing V%d. " + 
"Existing role record: %s. New role record fail to save: %s",
+                            haGroupName, record.getVersion(), 
existingRecord.getVersion(),
+                            existingRecord, record);
+            LOG.warn(msg);
+            return false; // return instead of error out to tolerate
+        }
+
+        if (record.getVersion() == existingRecord.getVersion()) {
+            if (record.equals(existingRecord)) {
+                LOG.debug(
+                        "Cluster role does not change since last update on 
ZK.");
+                return false; // no need to update iff they are the same.
+            } else {
+                String
+                        msg =
+                        String.format(
+                                "Invalid new cluster role record for HA group 
'%s' " + "because it has the same version V%d but inconsistent data. " + 
"Existing role record: %s. New role record fail to save: %s",
+                                haGroupName, record.getVersion(), 
existingRecord, record);
+                LOG.error(msg);
+                throw new IOException(msg);
+            }
+        }
+
+        return updateDataOnZookeeper(existingRecord, record);
+    }
+
+    /**
+     * Helper to create the znode on the ZK cluster.
+     */
+    private void createDataOnZookeeper(ClusterRoleRecord record) throws 
IOException {
+        String haGroupName = record.getHaGroupName();
+        // znode path for given haGroup name assuming namespace (prefix) has 
been set.
+        String haGroupPath = toPath(haGroupName);
+        try {
+            
getCurator().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+                    .forPath(haGroupPath, ClusterRoleRecord.toJson(record));
+        } catch (KeeperException.NodeExistsException nee) {
+            //this method assumes that the znode doesn't exist yet, but it 
could have been
+            //created between now and the last time we checked. We swallow the 
exception and
+            //rely on our caller to check to make sure the znode that's saved 
is correct
+            LOG.warn("Znode for HA group {} already exists. ", haGroupPath, 
nee);
+        } catch (Exception e) {
+            LOG.error(
+                    "Fail to initialize the znode for HA group {} with record 
data {}", haGroupPath,
+                    record, e);
+            throw new IOException("Fail to initialize znode for HA group " + 
haGroupPath, e);
+        }
+    }
+
+    /**
+     * Helper to update the znode on ZK cluster assuming current data is the 
given old record.
+     */
+    private boolean updateDataOnZookeeper(ClusterRoleRecord oldRecord, 
ClusterRoleRecord newRecord)
+            throws IOException {
+        // znode path for given haGroup name assuming namespace (prefix) has 
been set.
+        String haGroupPath = toPath(newRecord.getHaGroupName());
+        RetryPolicy retryPolicy = 
HighAvailabilityGroup.createRetryPolicy(properties);
+        try {
+            DistributedAtomicValue
+                    v =
+                    new DistributedAtomicValue(getCurator(), haGroupPath, 
retryPolicy);
+            AtomicValue<byte[]>
+                    result =
+                    v.compareAndSet(ClusterRoleRecord.toJson(oldRecord),
+                            ClusterRoleRecord.toJson(newRecord));
+            LOG.info(
+                    "Updated cluster role record ({}->{}) for HA group {} on 
cluster '{}': {}",
+                    oldRecord.getVersion(), newRecord.getVersion(), 
newRecord.getHaGroupName(),
+                    zkUrl, result.succeeded() ? "succeeded" : "failed");
+            LOG.debug(
+                    "Old DistributedAtomicValue: {}, New 
DistributedAtomicValue: {},",
+                    new String(result.preValue(), StandardCharsets.UTF_8),
+                    new String(result.postValue(), StandardCharsets.UTF_8));
+            return result.succeeded();
+        } catch (Exception e) {
+            String
+                    msg =
+                    String.format(
+                            "Fail to update cluster role record to ZK for the 
HA " + "group %s due to '%s'." + "Existing role record: %s. New role record 
fail to save: %s",
+                            haGroupPath, e.getMessage(), oldRecord, newRecord);
+            LOG.error(msg, e);
+            throw new IOException(msg, e);
+        }
+    }
+
+    /**
+     * Helper method to get ZK path for an HA group given the HA group name.
+     * It assumes the ZK namespace (prefix) has been set.
+     */
+    public static String toPath(String haGroupName) {
+        return ZKPaths.PATH_SEPARATOR + haGroupName;
+    }
+
+    public String getZkUrl() {
+        return zkUrl;
+    }
+
+    @Override
+    public void close() {
+        LOG.debug("PhoenixHAAdmin for {} is now closed.", zkUrl);
+    }
+
+    @Override
+    public String toString() {
+        return zkUrl;
+    }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
index 5da3ea1593..a59dc9fd8d 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
@@ -17,18 +17,11 @@
  */
 package org.apache.phoenix.jdbc;
 
-import java.io.Closeable;
 import java.io.FileReader;
-import java.io.IOException;
 import java.io.Reader;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
 
 import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
 import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
@@ -38,25 +31,14 @@ import 
org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
 import org.apache.phoenix.thirdparty.org.apache.commons.cli.PosixParser;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.atomic.AtomicValue;
-import org.apache.curator.framework.recipes.atomic.DistributedAtomicValue;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
-import org.apache.phoenix.util.JDBCUtil;
+import org.apache.phoenix.jdbc.PhoenixHAAdmin.HighAvailibilityCuratorProvider;
 import org.apache.phoenix.util.JacksonUtil;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import org.slf4j.Logger;
@@ -109,9 +91,10 @@ public class PhoenixHAAdminTool extends Configured 
implements Tool {
             if (commandLine.hasOption(HELP_OPT.getOpt())) {
                 printUsageMessage();
                 return RET_SUCCESS;
-            } else if (commandLine.hasOption(LIST_OPT.getOpt())) { // list
-                String zkUrl = getLocalZkUrl(getConf()); // Admin is created 
against local ZK cluster
-                try (PhoenixHAAdminHelper admin = new 
PhoenixHAAdminHelper(zkUrl, getConf(), 
HighAvailibilityCuratorProvider.INSTANCE)) {
+            }
+            String zkUrl = PhoenixHAAdmin.getLocalZkUrl(getConf()); // Admin 
is created against local ZK cluster
+            if (commandLine.hasOption(LIST_OPT.getOpt())) { // list
+                try (PhoenixHAAdmin admin = new PhoenixHAAdmin(zkUrl, 
getConf(), HighAvailibilityCuratorProvider.INSTANCE)) {
                     List<ClusterRoleRecord> records = 
admin.listAllClusterRoleRecordsOnZookeeper();
                     JacksonUtil.getObjectWriterPretty().writeValue(System.out, 
records);
                 }
@@ -119,16 +102,17 @@ public class PhoenixHAAdminTool extends Configured 
implements Tool {
                 String fileName = 
commandLine.getOptionValue(MANIFEST_OPT.getOpt());
                 List<ClusterRoleRecord> records = 
readRecordsFromFile(fileName);
                 boolean forceful = 
commandLine.hasOption(FORCEFUL_OPT.getOpt());
-                Map<String, List<String>> failedHaGroups = 
syncClusterRoleRecords(records, forceful);
-                if (!failedHaGroups.isEmpty()) {
-                    System.out.println("Found following HA groups are failing 
to write the clusters:");
-                    failedHaGroups.forEach((k, v) ->
-                            System.out.printf("%s -> [%s]\n", k, 
String.join(",", v)));
-                    return RET_SYNC_ERROR;
+                try (PhoenixHAAdmin admin = new PhoenixHAAdmin(zkUrl, 
getConf(), HighAvailibilityCuratorProvider.INSTANCE)) {
+                    Map<String, List<String>> failedHaGroups = 
admin.syncClusterRoleRecords(records, forceful);
+                    if (!failedHaGroups.isEmpty()) {
+                        System.out.println("Found following HA groups are 
failing to write the clusters:");
+                        failedHaGroups.forEach((k, v) ->
+                                System.out.printf("%s -> [%s]\n", k, 
String.join(",", v)));
+                        return RET_SYNC_ERROR;
+                    }
                 }
             } else if (commandLine.hasOption(REPAIR_OPT.getOpt()))  { // 
verify and repair
-                String zkUrl = getLocalZkUrl(getConf()); // Admin is created 
against local ZK cluster
-                try (PhoenixHAAdminHelper admin = new 
PhoenixHAAdminHelper(zkUrl, getConf(), 
HighAvailibilityCuratorProvider.INSTANCE)) {
+                try (PhoenixHAAdmin admin = new PhoenixHAAdmin(zkUrl, 
getConf(), HighAvailibilityCuratorProvider.INSTANCE)) {
                     List<String> inconsistentRecord = 
admin.verifyAndRepairWithRemoteZnode();
                     if (!inconsistentRecord.isEmpty()) {
                         System.out.println("Found following inconsistent 
cluster role records: ");
@@ -171,64 +155,6 @@ public class PhoenixHAAdminTool extends Configured 
implements Tool {
         }
     }
 
-    /**
-     * Helper method to write the given cluster role records into the ZK 
clusters respectively.
-     *
-     * // TODO: add retry logics
-     *
-     * @param records The cluster role record list to save on ZK
-     * @param forceful if true, this method will ignore errors on other 
clusters; otherwise it will
-     *                 not update next cluster (in order) if there is any 
failure on current cluster
-     * @return a map of HA group name to list cluster's url for cluster role 
record failing to write
-     */
-    private Map<String, List<String>> 
syncClusterRoleRecords(List<ClusterRoleRecord> records,
-            boolean forceful) throws IOException {
-        Map<String, List<String>> failedHaGroups = new HashMap<>();
-        for (ClusterRoleRecord record : records) {
-            String haGroupName = record.getHaGroupName();
-            try (PhoenixHAAdminHelper admin1 = new 
PhoenixHAAdminHelper(record.getZk1(), getConf(), 
HighAvailibilityCuratorProvider.INSTANCE);
-                    PhoenixHAAdminHelper admin2 = new 
PhoenixHAAdminHelper(record.getZk2(), getConf(), 
HighAvailibilityCuratorProvider.INSTANCE)) {
-                // Update the cluster previously ACTIVE cluster first.
-                // It reduces the chances of split-brain between clients and 
clusters.
-                // If can not determine previous ACTIVE cluster, update new 
STANDBY cluster first.
-                final PairOfSameType<PhoenixHAAdminHelper> pair;
-                if (admin1.isCurrentActiveCluster(haGroupName)) {
-                    pair = new PairOfSameType<>(admin1, admin2);
-                } else if (admin2.isCurrentActiveCluster(haGroupName)) {
-                    pair = new PairOfSameType<>(admin2, admin1);
-                } else if (record.getRole(admin1.getZkUrl()) == 
ClusterRole.STANDBY) {
-                    pair = new PairOfSameType<>(admin1, admin2);
-                } else {
-                    pair = new PairOfSameType<>(admin2, admin1);
-                }
-                try {
-                    pair.getFirst().createOrUpdateDataOnZookeeper(record);
-                } catch (IOException e) {
-                    LOG.error("Error to create or update data on Zookeeper, 
cluster={}, record={}",
-                            pair.getFirst(), record);
-                    failedHaGroups.computeIfAbsent(haGroupName, (k) -> new 
ArrayList<>())
-                            .add(pair.getFirst().zkUrl);
-                    if (!forceful) {
-                        LOG.error("-forceful option is not enabled by command 
line options, "
-                                + "skip writing record {} to ZK clusters", 
record);
-                        // skip writing this record to second ZK cluster, so 
we should report that
-                        failedHaGroups.computeIfAbsent(haGroupName, (k) -> new 
ArrayList<>())
-                                .add(pair.getSecond().zkUrl);
-                        continue; // do not update this record on second 
cluster
-                    }
-                }
-                try {
-                    pair.getSecond().createOrUpdateDataOnZookeeper(record);
-                } catch (IOException e) {
-                    LOG.error("Error to create or update data on Zookeeper, 
cluster={}, record={}",
-                            pair.getFirst(), record);
-                    failedHaGroups.computeIfAbsent(haGroupName, (k) -> new 
ArrayList<>())
-                            .add(pair.getSecond().zkUrl);
-                }
-            }
-        }
-        return failedHaGroups;
-    }
 
     /**
      * Parses the commandline arguments, throw exception if validation fails.
@@ -268,370 +194,6 @@ public class PhoenixHAAdminTool extends Configured 
implements Tool {
         formatter.printHelp("help", OPTIONS);
     }
 
-    /**
-     * Helper method to get local ZK fully qualified URL (host:port:/hbase) 
from configuration.
-     */
-    public static String getLocalZkUrl(Configuration conf) {
-        String localZkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
-        if (StringUtils.isEmpty(localZkQuorum)) {
-            String msg = "ZK quorum not found by looking up key " + 
HConstants.ZOOKEEPER_QUORUM;
-            LOG.error(msg);
-            throw new IllegalArgumentException(msg);
-        }
-
-        String portStr = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
-        int port = HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
-        if (portStr != null) {
-            try {
-                port = Integer.parseInt(portStr);
-            } catch (NumberFormatException e) {
-                String msg = String.format("Unrecognized ZK port '%s' in ZK 
quorum '%s'",
-                        portStr, localZkQuorum);
-                LOG.error(msg, e);
-                throw new IllegalArgumentException(msg, e);
-            }
-        }
-
-        String localZkRoot = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
-                HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
-
-        return String.format("%s:%d:%s", localZkQuorum, port, localZkRoot);
-    }
-
-    /**
-     * Wrapper class for static accessor
-     */
-    @VisibleForTesting
-    static class HighAvailibilityCuratorProvider {
-
-        public static final HighAvailibilityCuratorProvider INSTANCE = new 
HighAvailibilityCuratorProvider();
-
-        /**
-         * Gets curator blocking if necessary to create it
-         */
-        public CuratorFramework getCurator(String zkUrl, Properties 
properties) throws IOException {
-            return HighAvailabilityGroup.getCurator(zkUrl, properties);
-        }
-    }
-
-    /**
-     * Helper class to update cluster role record for a ZK cluster.
-     *
-     * The ZK client this accessor has is confined to a single ZK cluster, but 
it can be used to
-     * operate multiple HA groups that are associated with this cluster.
-     */
-    @VisibleForTesting
-    static class PhoenixHAAdminHelper implements Closeable {
-        /** The fully qualified ZK URL for an HBase cluster in format 
host:port:/hbase */
-        private final String zkUrl;
-        /** Configuration of this command line tool. */
-        private final Configuration conf;
-        /** Client properties which has copies of configuration defining ZK 
timeouts / retries. */
-        private final Properties properties = new Properties();
-        /** Curator Provider **/
-        private final HighAvailibilityCuratorProvider 
highAvailibilityCuratorProvider;
-
-        PhoenixHAAdminHelper(String zkUrl, Configuration conf, 
HighAvailibilityCuratorProvider highAvailibilityCuratorProvider) {
-            Preconditions.checkNotNull(zkUrl);
-            Preconditions.checkNotNull(conf);
-            Preconditions.checkNotNull(highAvailibilityCuratorProvider);
-            this.zkUrl = JDBCUtil.formatZookeeperUrl(zkUrl);
-            this.conf = conf;
-            conf.iterator().forEachRemaining(k -> 
properties.setProperty(k.getKey(), k.getValue()));
-            this.highAvailibilityCuratorProvider = 
highAvailibilityCuratorProvider;
-        }
-
-        /**
-         * Gets curator from the cache if available otherwise calls into 
getCurator to make it.
-         */
-        private CuratorFramework getCurator() throws IOException {
-            return highAvailibilityCuratorProvider.getCurator(zkUrl, 
properties);
-        }
-
-        /**
-         * Check if current cluster is ACTIVE role for the given HA group.
-         *
-         * In case of Exception when it fails to read cluster role data from 
the current cluster, it
-         * will assume current cluster is not ACTIVE. Callers should be aware 
of "false positive"
-         * possibility especially due to connectivity issue between this tool 
and remote ZK cluster.
-         *
-         * @param haGroupName the HA group name; a cluster can be associated 
with multiple HA groups
-         * @return true if current cluster is ACTIVE role, otherwise false
-         */
-        private boolean isCurrentActiveCluster(String haGroupName) {
-            try {
-                byte[] data = 
getCurator().getData().forPath(toPath(haGroupName));
-
-                Optional<ClusterRoleRecord> record = 
ClusterRoleRecord.fromJson(data);
-                return record.isPresent() && record.get().getRole(zkUrl) == 
ClusterRole.ACTIVE;
-            } catch (NoNodeException ne) {
-                LOG.info("No role record found for HA group {} on '{}', 
assuming it is not active",
-                        haGroupName, zkUrl);
-                return false;
-            } catch (Exception e) {
-                LOG.warn("Got exception when reading record for {} on cluster 
{}",
-                        haGroupName, zkUrl, e);
-                return false;
-            }
-        }
-
-        /**
-         * This lists all cluster role records stored in the zookeeper nodes.
-         *
-         * This read-only operation and hence no side effect on the ZK cluster.
-         */
-        List<ClusterRoleRecord> listAllClusterRoleRecordsOnZookeeper() throws 
IOException {
-            List<String> haGroupNames;
-            try {
-                haGroupNames = 
getCurator().getChildren().forPath(ZKPaths.PATH_SEPARATOR);
-            } catch (Exception e) {
-                String msg = String.format("Got exception when listing all HA 
groups in %s", zkUrl);
-                LOG.error(msg);
-                throw new IOException(msg, e);
-            }
-
-            List<ClusterRoleRecord> records = new ArrayList<>();
-            List<String> failedHaGroups = new ArrayList<>();
-            for (String haGroupName : haGroupNames) {
-                try {
-                    byte[] data = 
getCurator().getData().forPath(ZKPaths.PATH_SEPARATOR + haGroupName);
-                    Optional<ClusterRoleRecord> record = 
ClusterRoleRecord.fromJson(data);
-                    if (record.isPresent()) {
-                        records.add(record.get());
-                    } else { // fail to deserialize data from JSON
-                        failedHaGroups.add(haGroupName);
-                    }
-                } catch (Exception e) {
-                    LOG.warn("Got exception when reading data for HA group 
{}", haGroupName, e);
-                    failedHaGroups.add(haGroupName);
-                }
-            }
-
-            if (!failedHaGroups.isEmpty()) {
-                String msg = String.format("Found following HA groups: %s. 
Fail to read cluster "
-                                + "role records for following HA groups: %s",
-                        String.join(",", haGroupNames), String.join(",", 
failedHaGroups));
-                LOG.error(msg);
-                throw new IOException(msg);
-            }
-            return records;
-        }
-
-        /**
-         * Verify cluster role records stored in local ZK nodes, and repair 
with remote znodes for
-         * any inconsistency.
-         *
-         * @return a list of HA group names with inconsistent cluster role 
records, or empty list
-         */
-        List<String> verifyAndRepairWithRemoteZnode() throws Exception {
-            List<String> inconsistentHaGroups = new ArrayList<>();
-            for (ClusterRoleRecord record : 
listAllClusterRoleRecordsOnZookeeper()) {
-                // the remote znodes may be on different ZK clusters.
-                if (record.getRole(zkUrl) == ClusterRole.UNKNOWN) {
-                    LOG.warn("Unknown cluster role for cluster '{}' in record 
{}", zkUrl, record);
-                    continue;
-                }
-                String remoteZkUrl = record.getZk1().equals(zkUrl)
-                        ? record.getZk2()
-                        : record.getZk1();
-                try (PhoenixHAAdminHelper remoteAdmin = new 
PhoenixHAAdminHelper(remoteZkUrl, conf, 
HighAvailibilityCuratorProvider.INSTANCE)) {
-                    ClusterRoleRecord remoteRecord;
-                    try {
-                        String zPath = toPath(record.getHaGroupName());
-                        byte[] data = 
remoteAdmin.getCurator().getData().forPath(zPath);
-                        Optional<ClusterRoleRecord> recordOptional = 
ClusterRoleRecord.fromJson(data);
-                        if (!recordOptional.isPresent()) {
-                            remoteAdmin.createOrUpdateDataOnZookeeper(record);
-                            continue;
-                        }
-                        remoteRecord = recordOptional.get();
-                    } catch (NoNodeException ne) {
-                        LOG.warn("No record znode yet, creating for HA group 
{} on {}",
-                                record.getHaGroupName(), remoteAdmin);
-                        remoteAdmin.createDataOnZookeeper(record);
-                        LOG.info("Created znode on cluster {} with record {}", 
remoteAdmin, record);
-                        continue;
-                    } catch (Exception e) {
-                        LOG.error("Error to get data on remote cluster {} for 
HA group {}",
-                                remoteAdmin, record.getHaGroupName(), e);
-                        continue;
-                    }
-
-                    if 
(!record.getHaGroupName().equals(remoteRecord.getHaGroupName())) {
-                        inconsistentHaGroups.add(record.getHaGroupName());
-                        LOG.error("INTERNAL ERROR: got cluster role record for 
different HA groups."
-                                + " Local record: {}, remote record: {}", 
record, remoteRecord);
-                    } else if (remoteRecord.isNewerThan(record)) {
-                        createOrUpdateDataOnZookeeper(remoteRecord);
-                    } else if (record.isNewerThan(remoteRecord)) {
-                        remoteAdmin.createOrUpdateDataOnZookeeper(record);
-                    } else if (record.equals(remoteRecord)) {
-                        LOG.info("Cluster role record {} is consistent", 
record);
-                    } else {
-                        inconsistentHaGroups.add(record.getHaGroupName());
-                        LOG.error("Cluster role record for HA group {} is 
inconsistent. On cluster "
-                                        + "{} the record is {}; on cluster {} 
the record is {}",
-                                record.getHaGroupName(), this, record, 
remoteAdmin, remoteRecord);
-                    }
-                }
-            }
-            return inconsistentHaGroups;
-        }
-
-        /**
-         * This updates the cluster role data on the zookeeper it connects to.
-         *
-         * To avoid conflicts, it does CAS (compare-and-set) when updating.  
The constraint is that
-         * the given record's version should be larger the existing record's 
version.  This is a way
-         * to help avoiding manual update conflicts.  If the given record can 
not meet version
-         * check, it will reject the update request and client (human operator 
or external system)
-         * should retry.
-         *
-         * @param record the new cluster role record to be saved on ZK
-         * @throws IOException if it fails to update the cluster role data on 
ZK
-         * @return true if the data on ZK is updated otherwise false
-         */
-        boolean createOrUpdateDataOnZookeeper(ClusterRoleRecord record) throws 
IOException {
-            if (!zkUrl.equals(record.getZk1()) && 
!zkUrl.equals(record.getZk2())) {
-                String msg = String.format("INTERNAL ERROR: "
-                                + "ZK cluster is not associated with cluster 
role record! "
-                                + "ZK cluster URL: '%s'. Cluster role record: 
%s",
-                        zkUrl, record);
-                LOG.error(msg);
-                throw new IOException(msg);
-            }
-
-            String haGroupName = record.getHaGroupName();
-            byte[] data;
-            try {
-                data = getCurator().getData().forPath(toPath(haGroupName)); // 
Get initial data
-            } catch (NoNodeException ne) {
-                LOG.info("No record znode yet, creating for HA group {} on 
{}", haGroupName, zkUrl);
-                createDataOnZookeeper(record);
-                LOG.info("Created znode for HA group {} with record data {} on 
{}", haGroupName,
-                        record, zkUrl);
-                return true;
-            } catch (Exception e) {
-                String msg = String.format("Fail to read cluster role record 
data for HA group %s "
-                        + "on cluster '%s'", haGroupName, zkUrl);
-                LOG.error(msg, e);
-                throw new IOException(msg, e);
-            }
-
-            Optional<ClusterRoleRecord> existingRecordOptional = 
ClusterRoleRecord.fromJson(data);
-            if (!existingRecordOptional.isPresent()) {
-                String msg = String.format("Fail to parse existing cluster 
role record data for HA "
-                        + "group %s", haGroupName);
-                LOG.error(msg);
-                throw new IOException(msg);
-            }
-
-            ClusterRoleRecord existingRecord = existingRecordOptional.get();
-            if (record.getVersion() < existingRecord.getVersion()) {
-                String msg = String.format("Invalid new cluster role record 
for HA group '%s' "
-                                + "because new record's version V%d is smaller 
than existing V%d. "
-                                + "Existing role record: %s. New role record 
fail to save: %s",
-                        haGroupName, record.getVersion(), 
existingRecord.getVersion(),
-                        existingRecord, record);
-                LOG.warn(msg);
-                return false; // return instead of error out to tolerate
-            }
-
-            if (record.getVersion() == existingRecord.getVersion()) {
-                if (record.equals(existingRecord)) {
-                    LOG.debug("Cluster role does not change since last update 
on ZK.");
-                    return false; // no need to update iff they are the same.
-                } else {
-                    String msg = String.format("Invalid new cluster role 
record for HA group '%s' "
-                                    + "because it has the same version V%d but 
inconsistent data. "
-                                    + "Existing role record: %s. New role 
record fail to save: %s",
-                            haGroupName, record.getVersion(), existingRecord, 
record);
-                    LOG.error(msg);
-                    throw new IOException(msg);
-                }
-            }
-
-            return updateDataOnZookeeper(existingRecord, record);
-        }
-
-        /**
-         * Helper to create the znode on the ZK cluster.
-         */
-        private void createDataOnZookeeper(ClusterRoleRecord record) throws 
IOException {
-            String haGroupName = record.getHaGroupName();
-            // znode path for given haGroup name assuming namespace (prefix) 
has been set.
-            String haGroupPath = toPath(haGroupName);
-            try {
-                getCurator().create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(haGroupPath, 
ClusterRoleRecord.toJson(record));
-            } catch (NodeExistsException nee) {
-                //this method assumes that the znode doesn't exist yet, but it 
could have been
-                //created between now and the last time we checked. We swallow 
the exception and
-                //rely on our caller to check to make sure the znode that's 
saved is correct
-                LOG.warn("Znode for HA group {} already exists. ",
-                        haGroupPath, nee);
-            } catch (Exception e) {
-                LOG.error("Fail to initialize the znode for HA group {} with 
record data {}",
-                        haGroupPath, record, e);
-                throw new IOException("Fail to initialize znode for HA group " 
+ haGroupPath, e);
-            }
-        }
-
-        /**
-         * Helper to update the znode on ZK cluster assuming current data is 
the given old record.
-         */
-        private boolean updateDataOnZookeeper(ClusterRoleRecord oldRecord,
-                ClusterRoleRecord newRecord) throws IOException {
-            // znode path for given haGroup name assuming namespace (prefix) 
has been set.
-            String haGroupPath = toPath(newRecord.getHaGroupName());
-            RetryPolicy retryPolicy = 
HighAvailabilityGroup.createRetryPolicy(properties);
-            try {
-                DistributedAtomicValue v = new 
DistributedAtomicValue(getCurator(), haGroupPath, retryPolicy);
-                AtomicValue<byte[]> result = v.compareAndSet(
-                        ClusterRoleRecord.toJson(oldRecord), 
ClusterRoleRecord.toJson(newRecord));
-                LOG.info("Updated cluster role record ({}->{}) for HA group {} 
on cluster '{}': {}",
-                        oldRecord.getVersion(), newRecord.getVersion(), 
newRecord.getHaGroupName(),
-                        zkUrl, result.succeeded() ? "succeeded" : "failed");
-                LOG.debug("Old DistributedAtomicValue: {}, New 
DistributedAtomicValue: {},",
-                        new String(result.preValue(), StandardCharsets.UTF_8),
-                        new String(result.postValue(), 
StandardCharsets.UTF_8));
-                return result.succeeded();
-            } catch (Exception e) {
-                String msg = String.format("Fail to update cluster role record 
to ZK for the HA "
-                                + "group %s due to '%s'."
-                                + "Existing role record: %s. New role record 
fail to save: %s",
-                        haGroupPath, e.getMessage(), oldRecord, newRecord);
-                LOG.error(msg, e);
-                throw new IOException(msg, e);
-            }
-        }
-
-        /**
-         * Helper method to get ZK path for an HA group given the HA group 
name.
-         *
-         * It assumes the ZK namespace (prefix) has been set.
-         */
-        private static String toPath(String haGroupName) {
-            return ZKPaths.PATH_SEPARATOR + haGroupName;
-        }
-
-        String getZkUrl() {
-            return zkUrl;
-        }
-
-        @Override
-        public void close() {
-            LOG.debug("PhoenixHAAdmin for {} is now closed.", zkUrl);
-        }
-
-        @Override
-        public String toString() {
-            return zkUrl;
-        }
-    }
-
     public static void main(String[] args) throws Exception {
         Configuration conf = HBaseConfiguration.create();
         int retCode = ToolRunner.run(conf, new PhoenixHAAdminTool(), args);
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 80e8bca6b1..be3801d118 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -382,6 +382,8 @@ public interface QueryServices extends SQLCloseable {
     public static final String PHOENIX_VIEW_TTL_ENABLED = 
"phoenix.view.ttl.enabled";
 
     public static final String PHOENIX_VIEW_TTL_TENANT_VIEWS_PER_SCAN_LIMIT = 
"phoenix.view.ttl.tenant_views_per_scan.limit";
+    // Block mutations based on cluster role record
+    public static final String CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = 
"phoenix.cluster.role.based.mutation.block.enabled";
 
     // Before 4.15 when we created a view we included the parent table column 
metadata in the view
     // metadata. After PHOENIX-3534 we allow SYSTEM.CATALOG to split and no 
longer store the parent
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index c4f3812b51..9755572566 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -26,6 +26,7 @@ import static 
org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_
 import static 
org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.CLIENT_METRICS_TAG;
 import static 
org.apache.phoenix.query.QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
 import static 
org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
 import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC;
 import static 
org.apache.phoenix.query.QueryServices.CONNECTION_ACTIVITY_LOGGING_ENABLED;
@@ -450,6 +451,8 @@ public class QueryServicesOptions {
 
     public static final int DEFAULT_PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT 
= 300000; // 5 minutes
 
+    public static final Boolean 
DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = false;
+
 
     private final Configuration config;
 
@@ -553,7 +556,8 @@ public class QueryServicesOptions {
                 DEFAULT_SERVER_MERGE_FOR_UNCOVERED_INDEX)
             .setIfUnset(MAX_IN_LIST_SKIP_SCAN_SIZE, 
DEFAULT_MAX_IN_LIST_SKIP_SCAN_SIZE)
             .setIfUnset(CONNECTION_ACTIVITY_LOGGING_ENABLED, 
DEFAULT_CONNECTION_ACTIVITY_LOGGING_ENABLED)
-            .setIfUnset(CONNECTION_ACTIVITY_LOGGING_INTERVAL, 
DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS);
+            .setIfUnset(CONNECTION_ACTIVITY_LOGGING_INTERVAL, 
DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS)
+            .setIfUnset(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, 
DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED);
 
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user 
set
diff --git 
a/phoenix-core-client/src/main/protobuf/RegionServerEndpointService.proto 
b/phoenix-core-client/src/main/protobuf/RegionServerEndpointService.proto
index 2d0da268ba..725f3fc763 100644
--- a/phoenix-core-client/src/main/protobuf/RegionServerEndpointService.proto
+++ b/phoenix-core-client/src/main/protobuf/RegionServerEndpointService.proto
@@ -50,10 +50,19 @@ message InvalidateServerMetadataCacheRequest {
   repeated InvalidateServerMetadataCache invalidateServerMetadataCacheRequests 
= 1;
 }
 
+message InvalidateHAGroupStoreClientRequest {
+}
+
+message InvalidateHAGroupStoreClientResponse {
+}
+
 service RegionServerEndpointService {
   rpc validateLastDDLTimestamp(ValidateLastDDLTimestampRequest)
       returns (ValidateLastDDLTimestampResponse);
 
   rpc invalidateServerMetadataCache(InvalidateServerMetadataCacheRequest)
       returns (InvalidateServerMetadataCacheResponse);
+
+  rpc invalidateHAGroupStoreClient(InvalidateHAGroupStoreClientRequest)
+      returns (InvalidateHAGroupStoreClientResponse);
 }
\ No newline at end of file
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index 59fd1209db..22fa84c4a6 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.cache.ServerMetadataCacheImpl;
 import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
 import 
org.apache.phoenix.coprocessorclient.metrics.MetricsMetadataCachingSource;
 import 
org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
+import org.apache.phoenix.jdbc.HAGroupStoreManager;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.util.ClientUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -106,6 +107,24 @@ public class PhoenixRegionServerEndpoint
         }
     }
 
+    @Override
+    public void invalidateHAGroupStoreClient(RpcController controller,
+            RegionServerEndpointProtos.InvalidateHAGroupStoreClientRequest 
request,
+            
RpcCallback<RegionServerEndpointProtos.InvalidateHAGroupStoreClientResponse> 
done) {
+        LOGGER.info("PhoenixRegionServerEndpoint invalidating 
HAGroupStoreClient");
+        HAGroupStoreManager haGroupStoreManager;
+        try {
+            haGroupStoreManager = HAGroupStoreManager.getInstance(conf);
+            haGroupStoreManager.invalidateHAGroupStoreClient();
+        } catch (Throwable t) {
+            String errorMsg = "Invalidating HAGroupStoreClient FAILED, check 
exception for "
+                    + "specific details";
+            LOGGER.error(errorMsg,  t);
+            IOException ioe = ClientUtil.createIOException(errorMsg, t);
+            ProtobufUtil.setControllerException(controller, ioe);
+        }
+    }
+
     @Override
     public Iterable<Service> getServices() {
         return Collections.singletonList(this);
@@ -114,4 +133,5 @@ public class PhoenixRegionServerEndpoint
     public ServerMetadataCache getServerMetadataCache() {
         return ServerMetadataCacheImpl.getInstance(conf);
     }
+
 }
\ No newline at end of file
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 74c965345a..915db903b0 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import 
org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
@@ -88,6 +89,7 @@ import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicyHelper;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicyHelper.MutateCommand;
 import org.apache.phoenix.index.PhoenixIndexMetaData;
+import org.apache.phoenix.jdbc.HAGroupStoreManager;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.join.HashJoinInfo;
@@ -1055,4 +1057,20 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
     protected boolean isRegionObserverFor(Scan scan) {
         return 
scan.getAttribute(BaseScannerRegionObserverConstants.UNGROUPED_AGG) != null;
     }
+
+    @Override
+    public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+                               MiniBatchOperationInProgress<Mutation> 
miniBatchOp)
+            throws IOException {
+        final Configuration conf = c.getEnvironment().getConfiguration();
+        try {
+            final HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(conf);
+            if (haGroupStoreManager.isMutationBlocked()) {
+                throw new IOException("Blocking Mutation as Some CRRs are in 
ACTIVE_TO_STANDBY "
+                        + "state and CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED 
is true");
+            }
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 1223c3a980..72ef706566 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.index.PhoenixIndexBuilderHelper;
+import org.apache.phoenix.jdbc.HAGroupStoreManager;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import 
org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
@@ -160,6 +161,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     private static final OperationStatus NOWRITE = new 
OperationStatus(SUCCESS);
     public static final String PHOENIX_APPEND_METADATA_TO_WAL = 
"phoenix.append.metadata.to.wal";
     public static final boolean DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL = false;
+
     /**
      * Class to represent pending data table rows
      * */
@@ -512,6 +514,9 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       }
   }
 
+    /*
+     * Also checks for mutationBlockEnabled if 
CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED is enabled.
+     */
   @Override
   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
@@ -519,6 +524,12 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
           return;
       }
       try {
+          final Configuration conf = c.getEnvironment().getConfiguration();
+          final HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(conf);
+          if (haGroupStoreManager.isMutationBlocked()) {
+              throw new IOException("Blocking Mutation as Some CRRs are in 
ACTIVE_TO_STANDBY "
+                      + "state and CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED 
is true");
+          }
           preBatchMutateWithExceptions(c, miniBatchOp);
           return;
       } catch (Throwable t) {
@@ -1303,6 +1314,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
             return ts;
         }
     }
+
     public void 
preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
                                              
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
         PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, 
miniBatchOp);
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 691731627d..4987d477c1 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -364,6 +364,10 @@
             <artifactId>curator-framework</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+        </dependency>
 
 
         <!-- Other test dependencies -->
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
index fa051203ca..f92db070e2 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java
@@ -39,7 +39,6 @@ import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@@ -73,7 +72,6 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_DDL_TIMESTAMP
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
 import static 
org.apache.phoenix.query.ConnectionQueryServicesImpl.INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE;
 import static 
org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED;
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointIT.java
index 8996939290..670aba7f15 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointIT.java
@@ -32,7 +32,6 @@ import org.apache.phoenix.util.ClientUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.ServerUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -158,6 +157,18 @@ public class PhoenixRegionServerEndpointIT extends 
BaseTest {
         }
     }
 
+    @Test
+    public void testInvalidateHAGroupStoreClient() {
+        HRegionServer regionServer = 
utility.getMiniHBaseCluster().getRegionServer(0);
+        PhoenixRegionServerEndpoint coprocessor = 
getPhoenixRegionServerEndpoint(regionServer);
+        assertNotNull(coprocessor);
+        ServerRpcController controller = new ServerRpcController();
+        RegionServerEndpointProtos.InvalidateHAGroupStoreClientRequest request
+                = 
RegionServerEndpointProtos.InvalidateHAGroupStoreClientRequest.newBuilder().build();
+        coprocessor.invalidateHAGroupStoreClient(controller, request, null);
+        assertFalse(controller.failed());
+    }
+
     private String getCreateTableStmt(String tableName) {
         return   "CREATE TABLE " + tableName +
                 "  (a_string varchar not null, col1 integer" +
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
new file mode 100644
index 0000000000..cd0679e9c2
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+/**
+ * Integration tests for {@link HAGroupStoreClient}
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class HAGroupStoreClientIT extends BaseTest {
+
+    private final PhoenixHAAdmin haAdmin = new PhoenixHAAdmin(config);
+    private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 1000L;
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Before
+    public void before() throws Exception {
+        // Clean up all the existing CRRs
+        List<ClusterRoleRecord> crrs = 
haAdmin.listAllClusterRoleRecordsOnZookeeper();
+        for (ClusterRoleRecord crr : crrs) {
+            
haAdmin.getCurator().delete().forPath(toPath(crr.getHaGroupName()));
+        }
+    }
+
+    @Test
+    public void testHAGroupStoreClientWithSingleCRR() throws Exception {
+        HAGroupStoreClient haGroupStoreClient = 
HAGroupStoreClient.getInstance(config);
+        ClusterRoleRecord crr = new ClusterRoleRecord("failover",
+                HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
 == 1;
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+
+        // Now Update CRR so that current cluster has state ACTIVE_TO_STANDBY
+        crr = new ClusterRoleRecord("failover",
+                HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr);
+
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        // Check that now the cluster should be in ActiveToStandby
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).isEmpty();
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size()
 == 1;
+
+
+        // Change it back to ACTIVE so that cluster is not in 
ACTIVE_TO_STANDBY state
+        crr = new ClusterRoleRecord("failover",
+                HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 3L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr);
+
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
 == 1;
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+
+
+        // Change it again to ACTIVE_TO_STANDBY so that we can validate 
watcher works repeatedly
+        crr = new ClusterRoleRecord("failover",
+                HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 4L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr);
+
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).isEmpty();
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size()
 == 1;
+
+
+        // Change peer cluster to ACTIVE_TO_STANDBY so that we can still 
process mutation on this cluster
+        crr = new ClusterRoleRecord("failover",
+                HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", 
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, 5L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr);
+
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
 == 1;
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+    }
+
+
+    @Test
+    public void testHAGroupStoreClientWithMultipleCRRs() throws Exception {
+        HAGroupStoreClient haGroupStoreClient = 
HAGroupStoreClient.getInstance(config);
+        // Setup initial CRRs
+        ClusterRoleRecord crr1 = new ClusterRoleRecord("failover",
+                HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+        ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel",
+                HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr1);
+        haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
 == 2;
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+
+        // Now Update CRR so that current cluster has state ACTIVE_TO_STANDBY 
for only 1 crr
+        crr1 = new ClusterRoleRecord("failover",
+                HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+        crr2 = new ClusterRoleRecord("parallel",
+                HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr1);
+        haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        // Check that now the cluster should be in ActiveToStandby
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
 == 1;
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size()
 == 1;
+
+
+        // Change it back to ACTIVE so that cluster is not in 
ACTIVE_TO_STANDBY state
+        crr1 = new ClusterRoleRecord("failover",
+                HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 3L);
+        crr2 = new ClusterRoleRecord("parallel",
+                HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 3L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr1);
+        haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
 == 2;
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+
+
+        // Change other crr to ACTIVE_TO_STANDBY and one in ACTIVE state so 
that we can validate watcher works repeatedly
+        crr1 = new ClusterRoleRecord("failover",
+                HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 4L);
+        crr2 = new ClusterRoleRecord("parallel",
+                HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 4L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr1);
+        haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
 == 1;
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size()
 == 1;
+
+
+        // Change peer cluster to ACTIVE_TO_STANDBY so that we can still 
process mutation on this cluster
+        crr1 = new ClusterRoleRecord("failover",
+                HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", 
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, 5L);
+        crr2 = new ClusterRoleRecord("parallel",
+                HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", 
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, 5L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr1);
+        haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
 == 2;
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+    }
+
+    @Test
+    public void testMultiThreadedAccessToHACache() throws Exception {
+        HAGroupStoreClient haGroupStoreClient = 
HAGroupStoreClient.getInstance(config);
+        // Setup initial CRRs
+        ClusterRoleRecord crr1 = new ClusterRoleRecord("failover",
+                HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+        ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel",
+                HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr1);
+        haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        int threadCount = 10;
+        final CountDownLatch latch = new CountDownLatch(threadCount);
+        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+        for (int i = 0; i < threadCount; i++) {
+            executor.submit(() -> {
+                try {
+                    assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
 == 2;
+                    assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+                    latch.countDown();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        }
+        assert latch.await(10, TimeUnit.SECONDS);
+
+        // Update CRRs
+        crr1 = new ClusterRoleRecord("failover",
+                HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+        crr2 = new ClusterRoleRecord("parallel",
+                HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr1);
+        haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        final CountDownLatch latch2 = new CountDownLatch(threadCount);
+        executor = Executors.newFixedThreadPool(threadCount);
+        for (int i = 0; i < threadCount; i++) {
+            executor.submit(() -> {
+                try {
+                    assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
 == 1;
+                    assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size()
 == 1;
+                    latch2.countDown();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        }
+        assert latch2.await(10, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testHAGroupStoreClientWithRootPathDeletion() throws Exception {
+        HAGroupStoreClient haGroupStoreClient = 
HAGroupStoreClient.getInstance(config);
+        ClusterRoleRecord crr1 = new ClusterRoleRecord("failover",
+                HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+        ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel",
+                HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr1);
+        haAdmin.createOrUpdateDataOnZookeeper(crr2);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
 == 1;
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size()
 == 1;
+
+        
haAdmin.getCurator().delete().deletingChildrenIfNeeded().forPath(ZKPaths.PATH_SEPARATOR);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).isEmpty();
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty();
+
+
+        crr1 = new ClusterRoleRecord("failover",
+                HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+        crr2 = new ClusterRoleRecord("parallel",
+                HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr1);
+        haAdmin.createOrUpdateDataOnZookeeper(crr2);
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
 == 1;
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size()
 == 1;
+    }
+
+    @Test
+    public void testThrowsExceptionWithZKDisconnectionAndThenConnection() 
throws Exception {
+        HAGroupStoreClient haGroupStoreClient = 
HAGroupStoreClient.getInstance(config);
+        // Setup initial CRRs
+        ClusterRoleRecord crr1 = new ClusterRoleRecord("failover",
+                HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+        ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel",
+                HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr1);
+        haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
 == 2;
+
+        // Shutdown the ZK Cluster to simulate CONNECTION_SUSPENDED event
+        utility.shutdownMiniZKCluster();
+
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        //Check that HAGroupStoreClient instance is not healthy and throws 
IOException
+        assertThrows(IOException.class, () -> 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE));
+
+        // Start ZK on the same port to simulate CONNECTION_RECONNECTED event
+        
utility.startMiniZKCluster(1,Integer.parseInt(getZKClientPort(config)));
+
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        //Check that HAGroupStoreClient instance is back to healthy and 
provides correct response
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
 == 2;
+    }
+
+
+    @Test
+    public void testHAGroupStoreClientWithDifferentZKURLFormats() throws 
Exception {
+        HAGroupStoreClient haGroupStoreClient = 
HAGroupStoreClient.getInstance(config);
+        final String zkClientPort = getZKClientPort(config);
+        // Setup initial CRRs
+        final String format1 = "127.0.0.1\\:"+zkClientPort+"::/hbase"; // 
127.0.0.1\:53228::/hbase
+        final String format2 = "127.0.0.1:"+zkClientPort+"::/hbase"; //   
127.0.0.1:53228::/hbase
+        final String format3 = "127.0.0.1\\:"+zkClientPort+":/hbase";   // 
127.0.0.1\:53228:/hbase
+
+        ClusterRoleRecord crr1 = new ClusterRoleRecord("parallel1",
+                HighAvailabilityPolicy.PARALLEL, format1, 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr1);
+
+        ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel2",
+                HighAvailabilityPolicy.PARALLEL, format2, 
ClusterRoleRecord.ClusterRole.STANDBY,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+        ClusterRoleRecord crr3 = new ClusterRoleRecord("parallel3",
+                HighAvailabilityPolicy.PARALLEL, format3, 
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr3);
+
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size()
 == 1;
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size()
 == 1;
+        assert 
haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.STANDBY).size()
 == 1;
+    }
+
+
+    /**
+     * This test verifies that the updates coming via 
PathChildrenCacheListener are in order in which updates are sent to ZK
+     * @throws Exception
+     */
+    @Test
+    public void testHAGroupStoreClientWithMultiThreadedUpdates() throws 
Exception {
+        // Number of threads to execute
+        int threadCount = 15;
+
+        // Capture versions of crr in a list(crrEventVersions)  in order they 
are received.
+        List<Integer> crrEventVersions = new ArrayList<>();
+        CountDownLatch eventsLatch = new CountDownLatch(threadCount);
+        PathChildrenCacheListener pathChildrenCacheListener = (client, event) 
-> {
+            if(event.getData() != null && event.getData().getData() != null && 
ClusterRoleRecord.fromJson(event.getData().getData()).isPresent()) {
+                    ClusterRoleRecord crr = 
ClusterRoleRecord.fromJson(event.getData().getData()).get();
+                    crrEventVersions.add((int)crr.getVersion());
+                    eventsLatch.countDown();
+            }
+        };
+
+        // Start a new HAGroupStoreClient.
+        new HAGroupStoreClient(config, pathChildrenCacheListener);
+
+        // Create multiple threads for update to ZK.
+        final CountDownLatch updateLatch = new CountDownLatch(threadCount);
+        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+        // List captures the order of events that are sent.
+        List<Integer> updateList = new ArrayList<>();
+
+        // Create a queue which can be polled to send updates to ZK.
+        ConcurrentLinkedQueue<ClusterRoleRecord> updateQueue = new 
ConcurrentLinkedQueue<>();
+        for(int i = 0; i < threadCount; i++) {
+            updateQueue.add(createCRR(i+1));
+            updateList.add(i+1);
+        }
+
+        // Submit updates to ZK.
+        for (int i = 0; i < threadCount; i++) {
+            executor.submit(() -> {
+                try {
+                    synchronized (HAGroupStoreClientIT.class) {
+                        
haAdmin.createOrUpdateDataOnZookeeper(Objects.requireNonNull(updateQueue.poll()));
+                    }
+                    updateLatch.countDown();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        }
+
+        // Check if updates are sent and updates are received.
+        assert 
eventsLatch.await(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS*threadCount, 
TimeUnit.MILLISECONDS);
+        assert 
updateLatch.await(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS*threadCount, 
TimeUnit.MILLISECONDS);
+
+        // Assert that the order of updates is same as order of events.
+        assert updateList.equals(crrEventVersions);
+    }
+
+    private ClusterRoleRecord createCRR(Integer version) {
+        return new ClusterRoleRecord("parallel",
+                HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 
version);
+    }
+
+
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
new file mode 100644
index 0000000000..9dfbb6cb4f
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static 
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
+import static org.junit.Assert.assertFalse;
+
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class HAGroupStoreManagerIT extends BaseTest {
+    private final PhoenixHAAdmin haAdmin = new PhoenixHAAdmin(config);
+    private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 1000L;
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "true");
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Before
+    public void before() throws Exception {
+        // Clean up all the existing CRRs
+        List<ClusterRoleRecord> crrs = 
haAdmin.listAllClusterRoleRecordsOnZookeeper();
+        for (ClusterRoleRecord crr : crrs) {
+            
haAdmin.getCurator().delete().forPath(toPath(crr.getHaGroupName()));
+        }
+    }
+
+    @Test
+    public void testHAGroupStoreManagerWithSingleCRR() throws Exception {
+        HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(config);
+        // Setup initial CRRs
+        ClusterRoleRecord crr1 = new ClusterRoleRecord("failover",
+                HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+        ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel",
+                HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr1);
+        haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        assertFalse(haGroupStoreManager.isMutationBlocked());
+
+        crr1 = new ClusterRoleRecord("failover",
+                HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+        crr2 = new ClusterRoleRecord("parallel",
+                HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), 
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
+                "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L);
+        haAdmin.createOrUpdateDataOnZookeeper(crr1);
+        haAdmin.createOrUpdateDataOnZookeeper(crr2);
+
+        Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+        assert haGroupStoreManager.isMutationBlocked();
+    }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
index 27b627753f..4a7c6bce91 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.jdbc;
 import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.phoenix.end2end.PhoenixRegionServerEndpointTestImpl;
 import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
-import org.apache.phoenix.query.QueryServicesOptions;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.commons.lang3.RandomUtils;
@@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
-import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,6 +61,8 @@ import static 
org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCo
 import static 
org.apache.phoenix.jdbc.ClusterRoleRecordGeneratorTool.PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT;
 import static 
org.apache.phoenix.jdbc.FailoverPhoenixConnection.FAILOVER_TIMEOUT_MS_ATTR;
 import static org.apache.phoenix.jdbc.HighAvailabilityGroup.*;
+import static 
org.apache.phoenix.jdbc.PhoenixHAAdmin.HighAvailibilityCuratorProvider;
+
 import static 
org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -86,8 +86,8 @@ public class HighAvailabilityTestingUtility {
         private String url1;
         /** The host\:port::/hbase format of the JDBC string for HBase cluster 
2. */
         private String url2;
-        private PhoenixHAAdminHelper haAdmin1;
-        private PhoenixHAAdminHelper haAdmin2;
+        private PhoenixHAAdmin haAdmin1;
+        private PhoenixHAAdmin haAdmin2;
         private Admin admin1;
         private Admin admin2;
         @VisibleForTesting
@@ -121,8 +121,8 @@ public class HighAvailabilityTestingUtility {
             url1 = String.format("%s\\:%d::/hbase", confAddress1, 
hbaseCluster1.getZkCluster().getClientPort());
             url2 = String.format("%s\\:%d::/hbase", confAddress2, 
hbaseCluster2.getZkCluster().getClientPort());
 
-            haAdmin1 = new PhoenixHAAdminHelper(getUrl1(), 
hbaseCluster1.getConfiguration(), 
PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE);
-            haAdmin2 = new PhoenixHAAdminHelper(getUrl2(), 
hbaseCluster2.getConfiguration(), 
PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE);
+            haAdmin1 = new PhoenixHAAdmin(getUrl1(), 
hbaseCluster1.getConfiguration(), HighAvailibilityCuratorProvider.INSTANCE);
+            haAdmin2 = new PhoenixHAAdmin(getUrl2(), 
hbaseCluster2.getConfiguration(), HighAvailibilityCuratorProvider.INSTANCE);
 
             admin1 = hbaseCluster1.getConnection().getAdmin();
             admin2 = hbaseCluster2.getConnection().getAdmin();
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolTest.java
index eeafec80ba..e00d15c515 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolTest.java
@@ -17,8 +17,10 @@
  */
 package org.apache.phoenix.jdbc;
 
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
 import static org.apache.phoenix.jdbc.PhoenixHAAdminTool.RET_SUCCESS;
-import static org.apache.phoenix.jdbc.PhoenixHAAdminTool.getLocalZkUrl;
+import static 
org.apache.phoenix.jdbc.PhoenixHAAdmin.HighAvailibilityCuratorProvider;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -46,7 +48,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
-import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.junit.After;
 import org.junit.Before;
@@ -59,7 +60,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Unit test for {@link PhoenixHAAdminTool} including the helper class {@link 
PhoenixHAAdminHelper}.
+ * Unit test for {@link PhoenixHAAdminTool} including the helper class {@link 
PhoenixHAAdmin}.
  *
  * @see PhoenixHAAdminToolIT
  */
@@ -70,12 +71,13 @@ public class PhoenixHAAdminToolTest {
     private static final PrintStream STDOUT = System.out;
     private static final ByteArrayOutputStream STDOUT_CAPTURE = new 
ByteArrayOutputStream();
 
-    private final PhoenixHAAdminTool.HighAvailibilityCuratorProvider 
mockHighAvailibilityCuratorProvider = 
Mockito.mock(PhoenixHAAdminTool.HighAvailibilityCuratorProvider.class);
+    private final HighAvailibilityCuratorProvider 
mockHighAvailibilityCuratorProvider = 
Mockito.mock(HighAvailibilityCuratorProvider.class);
 
     /** Use mocked curator since there is no mini-ZK cluster. */
     private final CuratorFramework curator = 
Mockito.mock(CuratorFramework.class);
     /** HA admin to test for one test case. */
-    private final PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(ZK1, 
new Configuration(), mockHighAvailibilityCuratorProvider);
+    private final PhoenixHAAdmin
+            admin = new PhoenixHAAdmin(ZK1, new Configuration(), 
mockHighAvailibilityCuratorProvider);
 
     private String haGroupName;
     private ClusterRoleRecord recordV1;

Reply via email to