gjacoby126 commented on code in PR #1430:
URL: https://github.com/apache/phoenix/pull/1430#discussion_r875277029


##########
phoenix-core/src/it/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorToolIT.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import static 
org.apache.phoenix.jdbc.ClusterRoleRecordGeneratorTool.PHOENIX_HA_GENERATOR_FILE_ATTR;
+import static 
org.apache.phoenix.jdbc.ClusterRoleRecordGeneratorTool.PHOENIX_HA_GROUPS_ATTR;
+import static 
org.apache.phoenix.jdbc.ClusterRoleRecordGeneratorTool.PHOENIX_HA_GROUP_POLICY_ATTR_FORMAT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for {@link ClusterRoleRecordGeneratorTool}.
+ *
+ * @see ClusterRoleRecordGeneratorToolTest
+ */
+public class ClusterRoleRecordGeneratorToolIT {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClusterRoleRecordGeneratorToolIT.class);
+    private static final HBaseTestingUtilityPair CLUSTERS = new 
HBaseTestingUtilityPair();
+
+    @Rule
+    public final TestName testName = new TestName();
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        CLUSTERS.start();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        CLUSTERS.close();
+    }
+
+    @Test
+    public void testRun() throws Exception {
+        ClusterRoleRecordGeneratorTool generator = new 
ClusterRoleRecordGeneratorTool();
+        Configuration conf = new 
Configuration(CLUSTERS.getHBaseCluster1().getConfiguration());
+        String haGroupNames = String.format("%s,%s,%s", 
testName.getMethodName(),
+                testName.getMethodName() + 1, testName.getMethodName() + 2);
+        conf.set(PHOENIX_HA_GROUPS_ATTR, haGroupNames);
+        // create a temp file
+        File file = File.createTempFile("phoenix.ha.cluster.role.records", 
".json");
+        file.deleteOnExit();
+        conf.set(PHOENIX_HA_GENERATOR_FILE_ATTR, file.getAbsolutePath());
+
+        generator.setConf(conf);
+        int ret = ToolRunner.run(conf, new ClusterRoleRecordGeneratorTool(), 
new String[]{});
+        assertEquals(0, ret);
+        String recordJson = FileUtils.readFileToString(file);
+        LOG.info("The created file content is: \n{}", recordJson);
+        for (String haGroupName : haGroupNames.split(",")) {
+            assertTrue(recordJson.contains(haGroupName));
+        }
+    }
+
+    @Test
+    public void testListAllRecordsByGenerator1() throws Exception {
+        ClusterRoleRecordGeneratorTool generator1 = new 
ClusterRoleRecordGeneratorTool();
+        Configuration conf1 = new 
Configuration(CLUSTERS.getHBaseCluster1().getConfiguration());
+        conf1.set(PHOENIX_HA_GROUPS_ATTR, testName.getMethodName());
+        // explicitly set the failover policy as FAILOVER
+        conf1.set(String.format(PHOENIX_HA_GROUP_POLICY_ATTR_FORMAT, 
testName.getMethodName()),
+                HighAvailabilityPolicy.FAILOVER.name());
+        generator1.setConf(conf1);
+
+        // created with cluster1's conf, so cluster1 is ACTIVE
+        List<ClusterRoleRecord> records = generator1.listAllRecordsByZk();

Review Comment:
   what's actually creating the records we're listing here?



##########
phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java:
##########
@@ -0,0 +1,879 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
+import static org.apache.hadoop.test.GenericTestUtils.waitFor;
+import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION;
+import static 
org.apache.phoenix.jdbc.FailoverPhoenixConnection.FAILOVER_TIMEOUT_MS_ATTR;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_TRANSITION_TIMEOUT_MS_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneZKDown;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithStatement;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup;
+import static 
org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.exception.FailoverSQLException;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test failover basics for {@link FailoverPhoenixConnection}.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class FailoverPhoenixConnectionIT {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FailoverPhoenixConnectionIT.class);
+    private static final HBaseTestingUtilityPair CLUSTERS = new 
HBaseTestingUtilityPair();
+
+    @Rule
+    public final TestName testName = new TestName();
+    @Rule
+    public final Timeout globalTimeout= new Timeout(300, TimeUnit.SECONDS);
+
+    /** Client properties to create a connection per test. */
+    private Properties clientProperties;
+    /** HA group for this test. */
+    private HighAvailabilityGroup haGroup;
+    /** Table name per test case. */
+    private String tableName;
+    /** HA Group name for this test. */
+    private String haGroupName;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        CLUSTERS.start();
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+        CLUSTERS.close();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        haGroupName = testName.getMethodName();
+        clientProperties = new Properties();
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName);
+        // Set some client configurations to make test run faster
+        clientProperties.setProperty(COLLECT_REQUEST_LEVEL_METRICS, 
String.valueOf(true));
+        clientProperties.setProperty(FAILOVER_TIMEOUT_MS_ATTR, "30000");
+        clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_KEY, "3");
+        clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY, 
"1000");
+        clientProperties.setProperty(ZK_SESSION_TIMEOUT, "3000");
+        clientProperties.setProperty(PHOENIX_HA_TRANSITION_TIMEOUT_MS_KEY, 
"3000");
+        clientProperties.setProperty("zookeeper.recovery.retry.maxsleeptime", 
"1000");
+
+        // Make first cluster ACTIVE
+        CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER);
+
+        haGroup = getHighAvailibilityGroup(CLUSTERS.getJdbcUrl(), 
clientProperties);
+        LOG.info("Initialized haGroup {} with URL {}", haGroup, 
CLUSTERS.getJdbcUrl());
+        tableName = testName.getMethodName().toUpperCase();
+        CLUSTERS.createTableOnClusterPair(tableName);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        try {
+            haGroup.close();
+            PhoenixDriver.INSTANCE
+                    .getConnectionQueryServices(CLUSTERS.getUrl1(), 
haGroup.getProperties())
+                    .close();
+            PhoenixDriver.INSTANCE
+                    .getConnectionQueryServices(CLUSTERS.getUrl2(), 
haGroup.getProperties())
+                    .close();
+        } catch (Exception e) {
+            LOG.error("Fail to tear down the HA group and the CQS. Will 
ignore", e);
+        }
+    }
+
+    /**
+     * Test Phoenix connection creation and basic operations with HBase 
cluster pair.
+     */
+    @Test
+    public void testOperationUsingConnection() throws Exception {
+        try (Connection conn = createFailoverConnection()) {
+            doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+        }
+    }
+
+    /**
+     * Test close() once more should not fail, as the second close should be a 
no-op.
+     */
+    @Test
+    public void testCloseConnectionOnceMore() throws Exception {
+        Connection conn = createFailoverConnection();
+        doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+        conn.close();
+        conn.close(); // this is NOT duplicate code, but instead this is 
essential for this test.
+    }
+
+    /**
+     * Tests that new Phoenix connections are not created during failover.
+     */
+    @Test
+    public void testConnectionCreationFailsIfNoActiveCluster() throws 
Exception {
+        try (Connection conn = createFailoverConnection()) {
+            doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+        }
+
+        CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, 
ClusterRole.STANDBY);
+
+        try {
+            createFailoverConnection();
+            fail("Should have failed because neither cluster is ACTIVE");
+        } catch (SQLException e) {
+            LOG.info("Got expected exception when creating new connection", e);
+            assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), 
e.getErrorCode());
+        } // all other type of exception will fail this test.
+    }
+
+    /**
+     * Tests new Phoenix connections are created if one cluster is OFFLINE and 
the other ACTIVE.
+     */
+    @Test
+    public void testConnectionOneOfflineOneActive() throws Exception {
+        CLUSTERS.transitClusterRole(haGroup, ClusterRole.OFFLINE, 
ClusterRole.ACTIVE);
+
+        try (Connection conn = createFailoverConnection()) {
+            doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+        }
+    }
+
+    /**
+     * Test that failover can finish according to the policy.
+     *
+     * In this test case, there is no existing CQS or open connections to 
close.
+     *
+     * @see #testFailoverCanFinishWhenOneZKDownWithCQS
+     */
+    @Test
+    public void testFailoverCanFinishWhenOneZKDown() throws Exception {

Review Comment:
   Didn't this one use to flap some in our internal CI? If so, has this been 
stabilized? We should make sure we're not introducing more flappers upstream



##########
phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java:
##########
@@ -0,0 +1,675 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import static org.apache.hadoop.test.GenericTestUtils.waitFor;
+import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneHBaseDown;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneZKDown;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair;
+import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for {@link HighAvailabilityGroup} with mini clusters.
+ *
+ * @see HighAvailabilityGroupTestIT
+ */
+@SuppressWarnings("UnstableApiUsage")
+@Category(NeedsOwnMiniClusterTest.class)
+public class HighAvailabilityGroupIT {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HighAvailabilityGroupIT.class);
+    private static final HBaseTestingUtilityPair CLUSTERS = new 
HBaseTestingUtilityPair();
+
+    /** Client properties to create a connection per test. */
+    private Properties clientProperties;
+    /** JDBC connection string for this test HA group. */
+    private String jdbcUrl;
+    /** Failover HA group for to test. */
+    private HighAvailabilityGroup haGroup;
+    /** HA Group name for this test. */
+    private String haGroupName;
+
+    @Rule
+    public final TestName testName = new TestName();
+    @Rule
+    public final Timeout globalTimeout = new Timeout(180, TimeUnit.SECONDS);
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        CLUSTERS.start();
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        HighAvailabilityGroup.CURATOR_CACHE.invalidateAll();
+        DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+        CLUSTERS.close();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        haGroupName = testName.getMethodName();
+        clientProperties = new Properties();
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName);
+        clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_KEY, "3");
+        clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY, 
"2000");
+        // Make first cluster ACTIVE
+        CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER);
+        jdbcUrl = CLUSTERS.getJdbcUrl();
+        haGroup = getHighAvailibilityGroup(jdbcUrl,clientProperties);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        haGroup.close();
+        try {
+            PhoenixDriver.INSTANCE
+                    .getConnectionQueryServices(CLUSTERS.getUrl1(), 
haGroup.getProperties())
+                    .close();
+            PhoenixDriver.INSTANCE
+                    .getConnectionQueryServices(CLUSTERS.getUrl2(), 
haGroup.getProperties())
+                    .close();
+        } catch (Exception e) {
+            LOG.error("Fail to tear down the HA group and the CQS. Will 
ignore", e);
+        }
+    }
+
+    /**
+     * Test get static method.
+     */
+    @Test
+    public void testGet() throws Exception {
+        // Client will get the same HighAvailabilityGroup using the same 
information as key
+        Optional<HighAvailabilityGroup> haGroup2 = Optional.empty();
+        try {
+            haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+            assertTrue(haGroup2.isPresent());
+            assertSame(haGroup, haGroup2.get());
+        } finally {
+            haGroup2.ifPresent(HighAvailabilityGroup::close);
+        }
+
+        // Client will get a different HighAvailabilityGroup when group name 
is different
+        String haGroupName3 = testName.getMethodName() + 
RandomStringUtils.randomAlphabetic(3);
+        CLUSTERS.initClusterRole(haGroupName3, 
HighAvailabilityPolicy.FAILOVER);
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName3);
+        Optional<HighAvailabilityGroup> haGroup3 = Optional.empty();
+        try {
+            haGroup3 = HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+            assertTrue(haGroup3.isPresent());
+            assertNotSame(haGroup, haGroup3.get());
+        } finally {
+            haGroup3.ifPresent(HighAvailabilityGroup::close);
+        }
+
+        // Client will get the same HighAvailabilityGroup using the same 
information as key again
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, 
haGroup.getGroupInfo().getName());
+        Optional<HighAvailabilityGroup> haGroup4 = Optional.empty();
+        try {
+            haGroup4 = HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+            assertTrue(haGroup4.isPresent());
+            assertSame(haGroup, haGroup4.get());
+        } finally {
+            haGroup4.ifPresent(HighAvailabilityGroup::close);
+        }
+    }
+
+    /**
+     * Test that HA group should see latest version of cluster role record.
+     */
+    @Test
+    public void testGetWithDifferentRecordVersion() throws Exception {
+        String haGroupName2 = testName.getMethodName() + 
RandomStringUtils.randomAlphabetic(3);
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2);
+
+        // create cluster role records with different versions on two ZK 
clusters
+        final String zpath = ZKPaths.PATH_SEPARATOR + haGroupName2;
+        ClusterRoleRecord record1 = new ClusterRoleRecord(
+                haGroupName2, HighAvailabilityPolicy.FAILOVER,
+                CLUSTERS.getUrl1(), ClusterRole.ACTIVE,
+                CLUSTERS.getUrl2(), ClusterRole.STANDBY,
+                1);
+        CLUSTERS.createCurator1().create().forPath(zpath, 
ClusterRoleRecord.toJson(record1));
+        ClusterRoleRecord record2 = new ClusterRoleRecord(
+                record1.getHaGroupName(), record1.getPolicy(),
+                record1.getZk1(), record1.getRole1(),
+                record1.getZk2(), record1.getRole2(),
+                record1.getVersion() + 1); // record2 is newer
+        CLUSTERS.createCurator2().create().forPath(zpath, 
ClusterRoleRecord.toJson(record2));
+
+        Optional<HighAvailabilityGroup> haGroup2 = Optional.empty();
+        try {
+            haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+            assertTrue(haGroup2.isPresent());
+            assertNotSame(haGroup2.get(), haGroup);
+            // HA group should see latest version when both role managers are 
started
+            HighAvailabilityGroup finalHaGroup = haGroup2.get();
+            waitFor(() -> record2.equals(finalHaGroup.getRoleRecord()), 100, 
30_000);
+        } finally {
+            haGroup2.ifPresent(HighAvailabilityGroup::close);
+        }
+    }
+
+    /**
+     * Test that client can get an HA group when ACTIVE ZK cluster is down.
+     *
+     * NOTE: we can not test with existing HA group because {@link 
HighAvailabilityGroup#get} would
+     * get the cached object, which has also been initialized.
+     *
+     * The reason this works is because getting an HA group depends on one ZK 
watcher connects to a
+     * ZK cluster to get the associated cluster role record. It does not 
depend on both ZK cluster
+     * being up and running. It does not actually create HBase connection 
either.
+     */
+    @Test
+    public void testCanGetHaGroupWhenActiveZKClusterDown() throws Exception {
+        String haGroupName2 = testName.getMethodName() + 2;
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2);
+        CLUSTERS.initClusterRole(haGroupName2, 
HighAvailabilityPolicy.FAILOVER);
+
+      doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> {
+          HighAvailabilityGroup haGroup2 = null;
+          try {
+              haGroup2 = getHighAvailibilityGroup(jdbcUrl, clientProperties);
+              LOG.info("Can get the new HA group {} after both ZK clusters 
restart", haGroup2);
+          } finally {
+              if (haGroup2 != null) {
+                  haGroup2.close();
+              }
+          }
+      });
+    }
+
+    /**
+     * Test that client can not get an HA group when both ZK clusters are down.
+     *
+     * NOTE: we can not test with existing HA group because {@link 
HighAvailabilityGroup#get} would
+     * get the cached object, which has also been initialized.
+     *
+     * What if two HBase clusters instead of two ZK clusters are down? We may 
still get a new HA
+     * group because creating and initializing HA group do not set up the 
HBase connection, which
+     * should indeed fail when ACTIVE HBase cluster is down.
+     */
+    @Test
+    public void testCanNotGetHaGroupWhenTwoZKClustersDown() throws Exception {
+        String haGroupName2 = testName.getMethodName() + 2;
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2);
+        CLUSTERS.initClusterRole(haGroupName2, 
HighAvailabilityPolicy.FAILOVER);
+
+        doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () ->
+            doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> {
+                try {
+                    HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+                    fail("Should have failed because both ZK cluster were 
shutdown!");
+                } catch (SQLException e) {
+                    LOG.info("Got expected SQLException because both ZK 
clusters are down", e);
+                    assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), 
e.getErrorCode());
+                    assertTrue(e.getCause() instanceof IOException);
+                }
+            })
+        );
+
+        HighAvailabilityGroup haGroup2 = null;
+        try {
+            haGroup2 = getHighAvailibilityGroup(jdbcUrl, clientProperties);
+            LOG.info("Can get the new HA group {} after both ZK clusters 
restart", haGroup2);
+        } finally {
+            if (haGroup2 != null) {
+                haGroup2.close();
+            }
+        }
+    }
+
+    @Test
+    public void testGetHaGroupFailsFastWhenBothZKClusterDownFromBeginning() {
+        String haGroupName = testName.getMethodName();
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName);
+        String badJdbcUrl = String.format("jdbc:phoenix:[%s|%s]", 
"127.0.0.1:0", "127.0.0.1:1");
+        LOG.info("Start testing HighAvailabilityGroup::get() when both ZK 
clusters are down...");
+        long startTime = System.currentTimeMillis();
+        try {
+            HighAvailabilityGroup.get(badJdbcUrl, clientProperties);
+            fail("Should always throw an exception.");
+        } catch (SQLException e){
+            LOG.info("Got expected exception", e);
+            assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), 
e.getErrorCode());
+        } finally {
+            LOG.info("Stop testing HighAvailabilityGroup::get() when both ZK 
clusters are down...");

Review Comment:
   nit: couldn't this be done by just putting a short timeout on the Junit test 
attribute?



##########
phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java:
##########
@@ -0,0 +1,675 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import static org.apache.hadoop.test.GenericTestUtils.waitFor;
+import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneHBaseDown;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneZKDown;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair;
+import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for {@link HighAvailabilityGroup} with mini clusters.
+ *
+ * @see HighAvailabilityGroupTestIT
+ */
+@SuppressWarnings("UnstableApiUsage")
+@Category(NeedsOwnMiniClusterTest.class)
+public class HighAvailabilityGroupIT {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HighAvailabilityGroupIT.class);
+    private static final HBaseTestingUtilityPair CLUSTERS = new 
HBaseTestingUtilityPair();
+
+    /** Client properties to create a connection per test. */
+    private Properties clientProperties;
+    /** JDBC connection string for this test HA group. */
+    private String jdbcUrl;
+    /** Failover HA group for to test. */
+    private HighAvailabilityGroup haGroup;
+    /** HA Group name for this test. */
+    private String haGroupName;
+
+    @Rule
+    public final TestName testName = new TestName();
+    @Rule
+    public final Timeout globalTimeout = new Timeout(180, TimeUnit.SECONDS);
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        CLUSTERS.start();
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        HighAvailabilityGroup.CURATOR_CACHE.invalidateAll();
+        DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+        CLUSTERS.close();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        haGroupName = testName.getMethodName();
+        clientProperties = new Properties();
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName);
+        clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_KEY, "3");
+        clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY, 
"2000");
+        // Make first cluster ACTIVE
+        CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER);
+        jdbcUrl = CLUSTERS.getJdbcUrl();
+        haGroup = getHighAvailibilityGroup(jdbcUrl,clientProperties);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        haGroup.close();
+        try {
+            PhoenixDriver.INSTANCE
+                    .getConnectionQueryServices(CLUSTERS.getUrl1(), 
haGroup.getProperties())
+                    .close();
+            PhoenixDriver.INSTANCE
+                    .getConnectionQueryServices(CLUSTERS.getUrl2(), 
haGroup.getProperties())
+                    .close();
+        } catch (Exception e) {
+            LOG.error("Fail to tear down the HA group and the CQS. Will 
ignore", e);
+        }
+    }
+
+    /**
+     * Test get static method.
+     */
+    @Test
+    public void testGet() throws Exception {
+        // Client will get the same HighAvailabilityGroup using the same 
information as key
+        Optional<HighAvailabilityGroup> haGroup2 = Optional.empty();
+        try {
+            haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+            assertTrue(haGroup2.isPresent());
+            assertSame(haGroup, haGroup2.get());
+        } finally {
+            haGroup2.ifPresent(HighAvailabilityGroup::close);
+        }
+
+        // Client will get a different HighAvailabilityGroup when group name 
is different
+        String haGroupName3 = testName.getMethodName() + 
RandomStringUtils.randomAlphabetic(3);
+        CLUSTERS.initClusterRole(haGroupName3, 
HighAvailabilityPolicy.FAILOVER);
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName3);
+        Optional<HighAvailabilityGroup> haGroup3 = Optional.empty();
+        try {
+            haGroup3 = HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+            assertTrue(haGroup3.isPresent());
+            assertNotSame(haGroup, haGroup3.get());
+        } finally {
+            haGroup3.ifPresent(HighAvailabilityGroup::close);
+        }
+
+        // Client will get the same HighAvailabilityGroup using the same 
information as key again
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, 
haGroup.getGroupInfo().getName());
+        Optional<HighAvailabilityGroup> haGroup4 = Optional.empty();
+        try {
+            haGroup4 = HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+            assertTrue(haGroup4.isPresent());
+            assertSame(haGroup, haGroup4.get());
+        } finally {
+            haGroup4.ifPresent(HighAvailabilityGroup::close);
+        }
+    }
+
+    /**
+     * Test that HA group should see latest version of cluster role record.
+     */
+    @Test
+    public void testGetWithDifferentRecordVersion() throws Exception {
+        String haGroupName2 = testName.getMethodName() + 
RandomStringUtils.randomAlphabetic(3);
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2);
+
+        // create cluster role records with different versions on two ZK 
clusters
+        final String zpath = ZKPaths.PATH_SEPARATOR + haGroupName2;
+        ClusterRoleRecord record1 = new ClusterRoleRecord(
+                haGroupName2, HighAvailabilityPolicy.FAILOVER,
+                CLUSTERS.getUrl1(), ClusterRole.ACTIVE,
+                CLUSTERS.getUrl2(), ClusterRole.STANDBY,
+                1);
+        CLUSTERS.createCurator1().create().forPath(zpath, 
ClusterRoleRecord.toJson(record1));
+        ClusterRoleRecord record2 = new ClusterRoleRecord(
+                record1.getHaGroupName(), record1.getPolicy(),
+                record1.getZk1(), record1.getRole1(),
+                record1.getZk2(), record1.getRole2(),
+                record1.getVersion() + 1); // record2 is newer
+        CLUSTERS.createCurator2().create().forPath(zpath, 
ClusterRoleRecord.toJson(record2));
+
+        Optional<HighAvailabilityGroup> haGroup2 = Optional.empty();
+        try {
+            haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+            assertTrue(haGroup2.isPresent());
+            assertNotSame(haGroup2.get(), haGroup);
+            // HA group should see latest version when both role managers are 
started
+            HighAvailabilityGroup finalHaGroup = haGroup2.get();
+            waitFor(() -> record2.equals(finalHaGroup.getRoleRecord()), 100, 
30_000);
+        } finally {
+            haGroup2.ifPresent(HighAvailabilityGroup::close);
+        }
+    }
+
+    /**
+     * Test that client can get an HA group when ACTIVE ZK cluster is down.
+     *
+     * NOTE: we can not test with existing HA group because {@link 
HighAvailabilityGroup#get} would
+     * get the cached object, which has also been initialized.
+     *
+     * The reason this works is because getting an HA group depends on one ZK 
watcher connects to a
+     * ZK cluster to get the associated cluster role record. It does not 
depend on both ZK cluster
+     * being up and running. It does not actually create HBase connection 
either.
+     */
+    @Test
+    public void testCanGetHaGroupWhenActiveZKClusterDown() throws Exception {
+        String haGroupName2 = testName.getMethodName() + 2;
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2);
+        CLUSTERS.initClusterRole(haGroupName2, 
HighAvailabilityPolicy.FAILOVER);
+
+      doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> {
+          HighAvailabilityGroup haGroup2 = null;
+          try {
+              haGroup2 = getHighAvailibilityGroup(jdbcUrl, clientProperties);
+              LOG.info("Can get the new HA group {} after both ZK clusters 
restart", haGroup2);
+          } finally {
+              if (haGroup2 != null) {
+                  haGroup2.close();
+              }
+          }
+      });
+    }
+
+    /**
+     * Test that client can not get an HA group when both ZK clusters are down.
+     *
+     * NOTE: we can not test with existing HA group because {@link 
HighAvailabilityGroup#get} would
+     * get the cached object, which has also been initialized.
+     *
+     * What if two HBase clusters instead of two ZK clusters are down? We may 
still get a new HA
+     * group because creating and initializing HA group do not set up the 
HBase connection, which
+     * should indeed fail when ACTIVE HBase cluster is down.
+     */
+    @Test
+    public void testCanNotGetHaGroupWhenTwoZKClustersDown() throws Exception {
+        String haGroupName2 = testName.getMethodName() + 2;
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2);
+        CLUSTERS.initClusterRole(haGroupName2, 
HighAvailabilityPolicy.FAILOVER);
+
+        doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () ->
+            doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> {
+                try {
+                    HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+                    fail("Should have failed because both ZK cluster were 
shutdown!");
+                } catch (SQLException e) {
+                    LOG.info("Got expected SQLException because both ZK 
clusters are down", e);
+                    assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), 
e.getErrorCode());
+                    assertTrue(e.getCause() instanceof IOException);
+                }
+            })
+        );
+
+        HighAvailabilityGroup haGroup2 = null;
+        try {
+            haGroup2 = getHighAvailibilityGroup(jdbcUrl, clientProperties);
+            LOG.info("Can get the new HA group {} after both ZK clusters 
restart", haGroup2);
+        } finally {
+            if (haGroup2 != null) {
+                haGroup2.close();
+            }
+        }
+    }
+
+    @Test
+    public void testGetHaGroupFailsFastWhenBothZKClusterDownFromBeginning() {
+        String haGroupName = testName.getMethodName();
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName);
+        String badJdbcUrl = String.format("jdbc:phoenix:[%s|%s]", 
"127.0.0.1:0", "127.0.0.1:1");
+        LOG.info("Start testing HighAvailabilityGroup::get() when both ZK 
clusters are down...");
+        long startTime = System.currentTimeMillis();
+        try {
+            HighAvailabilityGroup.get(badJdbcUrl, clientProperties);
+            fail("Should always throw an exception.");
+        } catch (SQLException e){
+            LOG.info("Got expected exception", e);
+            assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), 
e.getErrorCode());
+        } finally {
+            LOG.info("Stop testing HighAvailabilityGroup::get() when both ZK 
clusters are down...");
+            long elapsedTime = System.currentTimeMillis() - startTime;
+            long maxTime = (4 * 
HighAvailabilityGroup.PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT);
+            assertTrue(String.format("Expected elapsed time %d to be less than 
%d",elapsedTime,maxTime), elapsedTime <= maxTime);
+        }
+    }
+
+    /**
+     * Test that it should fail fast to get HA group if the cluster role 
information is not there.
+     */
+    @Test
+    public void testGetShouldFailWithoutClusterRoleData() throws SQLException {
+        String invalidHaGroupName = testName.getMethodName() + 
RandomStringUtils.randomAlphanumeric(3);
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, 
invalidHaGroupName);
+        assertFalse(HighAvailabilityGroup.get(jdbcUrl, 
clientProperties).isPresent());
+    }
+
+    /**
+     * Test with invalid High Availability connection string.
+     */
+    @Test
+    public void testGetShouldFailWithNonHAJdbcString() {
+        final String oldJdbcString = String.format("jdbc:phoenix:%s", 
CLUSTERS.getUrl1());
+        try {
+            HighAvailabilityGroup.get(oldJdbcString, clientProperties);
+            fail("Should have failed with invalid connection string '" + 
oldJdbcString + "'");
+        } catch (SQLException e) {
+            LOG.info("Got expected exception with invalid connection string 
{}", oldJdbcString, e);
+            
assertEquals(SQLExceptionCode.MALFORMED_CONNECTION_URL.getErrorCode(), 
e.getErrorCode());
+        }
+    }
+
+    /**
+     * Test that we can connect to this HA group to get a JDBC connection.
+     */
+    @Test
+    public void testConnect() throws SQLException {
+        Connection connection = haGroup.connect(clientProperties);
+        assertNotNull(connection);
+        assertNotNull(connection.unwrap(FailoverPhoenixConnection.class));
+    }
+
+    /**
+     * Test connect to one cluster and returns a Phoenix connection which can 
be wrapped.
+     */
+    @Test
+    public void testConnectToOneCluster() throws SQLException {
+        final String url = CLUSTERS.getUrl1();
+        PhoenixConnection connection = haGroup.connectToOneCluster(url, 
clientProperties);
+        // TODO: should we ignore the security info in url?

Review Comment:
   nit: does this TODO need to be addressed?



##########
phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java:
##########
@@ -0,0 +1,675 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import static org.apache.hadoop.test.GenericTestUtils.waitFor;
+import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneHBaseDown;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneZKDown;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair;
+import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for {@link HighAvailabilityGroup} with mini clusters.
+ *
+ * @see HighAvailabilityGroupTestIT
+ */
+@SuppressWarnings("UnstableApiUsage")
+@Category(NeedsOwnMiniClusterTest.class)
+public class HighAvailabilityGroupIT {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HighAvailabilityGroupIT.class);
+    private static final HBaseTestingUtilityPair CLUSTERS = new 
HBaseTestingUtilityPair();
+
+    /** Client properties to create a connection per test. */
+    private Properties clientProperties;
+    /** JDBC connection string for this test HA group. */
+    private String jdbcUrl;
+    /** Failover HA group for to test. */
+    private HighAvailabilityGroup haGroup;
+    /** HA Group name for this test. */
+    private String haGroupName;
+
+    @Rule
+    public final TestName testName = new TestName();
+    @Rule
+    public final Timeout globalTimeout = new Timeout(180, TimeUnit.SECONDS);
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        CLUSTERS.start();
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        HighAvailabilityGroup.CURATOR_CACHE.invalidateAll();
+        DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+        CLUSTERS.close();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        haGroupName = testName.getMethodName();
+        clientProperties = new Properties();
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName);
+        clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_KEY, "3");
+        clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY, 
"2000");
+        // Make first cluster ACTIVE
+        CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER);
+        jdbcUrl = CLUSTERS.getJdbcUrl();
+        haGroup = getHighAvailibilityGroup(jdbcUrl,clientProperties);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        haGroup.close();
+        try {
+            PhoenixDriver.INSTANCE
+                    .getConnectionQueryServices(CLUSTERS.getUrl1(), 
haGroup.getProperties())
+                    .close();
+            PhoenixDriver.INSTANCE
+                    .getConnectionQueryServices(CLUSTERS.getUrl2(), 
haGroup.getProperties())
+                    .close();
+        } catch (Exception e) {
+            LOG.error("Fail to tear down the HA group and the CQS. Will 
ignore", e);
+        }
+    }
+
+    /**
+     * Test get static method.
+     */
+    @Test
+    public void testGet() throws Exception {
+        // Client will get the same HighAvailabilityGroup using the same 
information as key
+        Optional<HighAvailabilityGroup> haGroup2 = Optional.empty();
+        try {
+            haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+            assertTrue(haGroup2.isPresent());
+            assertSame(haGroup, haGroup2.get());
+        } finally {
+            haGroup2.ifPresent(HighAvailabilityGroup::close);
+        }
+
+        // Client will get a different HighAvailabilityGroup when group name 
is different
+        String haGroupName3 = testName.getMethodName() + 
RandomStringUtils.randomAlphabetic(3);
+        CLUSTERS.initClusterRole(haGroupName3, 
HighAvailabilityPolicy.FAILOVER);
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName3);
+        Optional<HighAvailabilityGroup> haGroup3 = Optional.empty();
+        try {
+            haGroup3 = HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+            assertTrue(haGroup3.isPresent());
+            assertNotSame(haGroup, haGroup3.get());
+        } finally {
+            haGroup3.ifPresent(HighAvailabilityGroup::close);
+        }
+
+        // Client will get the same HighAvailabilityGroup using the same 
information as key again
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, 
haGroup.getGroupInfo().getName());
+        Optional<HighAvailabilityGroup> haGroup4 = Optional.empty();
+        try {
+            haGroup4 = HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+            assertTrue(haGroup4.isPresent());
+            assertSame(haGroup, haGroup4.get());
+        } finally {
+            haGroup4.ifPresent(HighAvailabilityGroup::close);
+        }
+    }
+
+    /**
+     * Test that HA group should see latest version of cluster role record.
+     */
+    @Test
+    public void testGetWithDifferentRecordVersion() throws Exception {
+        String haGroupName2 = testName.getMethodName() + 
RandomStringUtils.randomAlphabetic(3);
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2);
+
+        // create cluster role records with different versions on two ZK 
clusters
+        final String zpath = ZKPaths.PATH_SEPARATOR + haGroupName2;
+        ClusterRoleRecord record1 = new ClusterRoleRecord(
+                haGroupName2, HighAvailabilityPolicy.FAILOVER,
+                CLUSTERS.getUrl1(), ClusterRole.ACTIVE,
+                CLUSTERS.getUrl2(), ClusterRole.STANDBY,
+                1);
+        CLUSTERS.createCurator1().create().forPath(zpath, 
ClusterRoleRecord.toJson(record1));
+        ClusterRoleRecord record2 = new ClusterRoleRecord(
+                record1.getHaGroupName(), record1.getPolicy(),
+                record1.getZk1(), record1.getRole1(),
+                record1.getZk2(), record1.getRole2(),
+                record1.getVersion() + 1); // record2 is newer
+        CLUSTERS.createCurator2().create().forPath(zpath, 
ClusterRoleRecord.toJson(record2));
+
+        Optional<HighAvailabilityGroup> haGroup2 = Optional.empty();
+        try {
+            haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+            assertTrue(haGroup2.isPresent());
+            assertNotSame(haGroup2.get(), haGroup);
+            // HA group should see latest version when both role managers are 
started
+            HighAvailabilityGroup finalHaGroup = haGroup2.get();
+            waitFor(() -> record2.equals(finalHaGroup.getRoleRecord()), 100, 
30_000);
+        } finally {
+            haGroup2.ifPresent(HighAvailabilityGroup::close);
+        }
+    }
+
+    /**
+     * Test that client can get an HA group when ACTIVE ZK cluster is down.
+     *
+     * NOTE: we can not test with existing HA group because {@link 
HighAvailabilityGroup#get} would
+     * get the cached object, which has also been initialized.
+     *
+     * The reason this works is because getting an HA group depends on one ZK 
watcher connects to a
+     * ZK cluster to get the associated cluster role record. It does not 
depend on both ZK cluster
+     * being up and running. It does not actually create HBase connection 
either.
+     */
+    @Test
+    public void testCanGetHaGroupWhenActiveZKClusterDown() throws Exception {
+        String haGroupName2 = testName.getMethodName() + 2;
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2);
+        CLUSTERS.initClusterRole(haGroupName2, 
HighAvailabilityPolicy.FAILOVER);
+
+      doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> {
+          HighAvailabilityGroup haGroup2 = null;
+          try {
+              haGroup2 = getHighAvailibilityGroup(jdbcUrl, clientProperties);
+              LOG.info("Can get the new HA group {} after both ZK clusters 
restart", haGroup2);
+          } finally {
+              if (haGroup2 != null) {
+                  haGroup2.close();
+              }
+          }
+      });
+    }
+
+    /**
+     * Test that client can not get an HA group when both ZK clusters are down.
+     *
+     * NOTE: we can not test with existing HA group because {@link 
HighAvailabilityGroup#get} would
+     * get the cached object, which has also been initialized.
+     *
+     * What if two HBase clusters instead of two ZK clusters are down? We may 
still get a new HA
+     * group because creating and initializing HA group do not set up the 
HBase connection, which
+     * should indeed fail when ACTIVE HBase cluster is down.
+     */
+    @Test
+    public void testCanNotGetHaGroupWhenTwoZKClustersDown() throws Exception {
+        String haGroupName2 = testName.getMethodName() + 2;
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2);
+        CLUSTERS.initClusterRole(haGroupName2, 
HighAvailabilityPolicy.FAILOVER);
+
+        doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () ->
+            doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> {
+                try {
+                    HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+                    fail("Should have failed because both ZK cluster were 
shutdown!");
+                } catch (SQLException e) {
+                    LOG.info("Got expected SQLException because both ZK 
clusters are down", e);
+                    assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), 
e.getErrorCode());
+                    assertTrue(e.getCause() instanceof IOException);
+                }
+            })
+        );
+
+        HighAvailabilityGroup haGroup2 = null;
+        try {
+            haGroup2 = getHighAvailibilityGroup(jdbcUrl, clientProperties);
+            LOG.info("Can get the new HA group {} after both ZK clusters 
restart", haGroup2);
+        } finally {
+            if (haGroup2 != null) {
+                haGroup2.close();
+            }
+        }
+    }
+
+    @Test
+    public void testGetHaGroupFailsFastWhenBothZKClusterDownFromBeginning() {
+        String haGroupName = testName.getMethodName();
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName);
+        String badJdbcUrl = String.format("jdbc:phoenix:[%s|%s]", 
"127.0.0.1:0", "127.0.0.1:1");
+        LOG.info("Start testing HighAvailabilityGroup::get() when both ZK 
clusters are down...");
+        long startTime = System.currentTimeMillis();
+        try {
+            HighAvailabilityGroup.get(badJdbcUrl, clientProperties);
+            fail("Should always throw an exception.");
+        } catch (SQLException e){
+            LOG.info("Got expected exception", e);
+            assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), 
e.getErrorCode());
+        } finally {
+            LOG.info("Stop testing HighAvailabilityGroup::get() when both ZK 
clusters are down...");
+            long elapsedTime = System.currentTimeMillis() - startTime;
+            long maxTime = (4 * 
HighAvailabilityGroup.PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT);
+            assertTrue(String.format("Expected elapsed time %d to be less than 
%d",elapsedTime,maxTime), elapsedTime <= maxTime);
+        }
+    }
+
+    /**
+     * Test that it should fail fast to get HA group if the cluster role 
information is not there.
+     */
+    @Test
+    public void testGetShouldFailWithoutClusterRoleData() throws SQLException {
+        String invalidHaGroupName = testName.getMethodName() + 
RandomStringUtils.randomAlphanumeric(3);
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, 
invalidHaGroupName);
+        assertFalse(HighAvailabilityGroup.get(jdbcUrl, 
clientProperties).isPresent());
+    }
+
+    /**
+     * Test with invalid High Availability connection string.
+     */
+    @Test
+    public void testGetShouldFailWithNonHAJdbcString() {
+        final String oldJdbcString = String.format("jdbc:phoenix:%s", 
CLUSTERS.getUrl1());
+        try {
+            HighAvailabilityGroup.get(oldJdbcString, clientProperties);
+            fail("Should have failed with invalid connection string '" + 
oldJdbcString + "'");
+        } catch (SQLException e) {
+            LOG.info("Got expected exception with invalid connection string 
{}", oldJdbcString, e);
+            
assertEquals(SQLExceptionCode.MALFORMED_CONNECTION_URL.getErrorCode(), 
e.getErrorCode());
+        }
+    }
+
+    /**
+     * Test that we can connect to this HA group to get a JDBC connection.
+     */
+    @Test
+    public void testConnect() throws SQLException {
+        Connection connection = haGroup.connect(clientProperties);
+        assertNotNull(connection);
+        assertNotNull(connection.unwrap(FailoverPhoenixConnection.class));
+    }
+
+    /**
+     * Test connect to one cluster and returns a Phoenix connection which can 
be wrapped.
+     */
+    @Test
+    public void testConnectToOneCluster() throws SQLException {
+        final String url = CLUSTERS.getUrl1();
+        PhoenixConnection connection = haGroup.connectToOneCluster(url, 
clientProperties);
+        // TODO: should we ignore the security info in url?
+        assertEquals(url, connection.getURL());
+
+        try {
+            haGroup.connectToOneCluster(null, clientProperties);
+            fail("Should have failed since null is not in any HA group");
+        } catch (Exception e) {
+            LOG.info("Got expected exception with invalid null host url", e);
+        }
+
+        final String randomHostUrl = String.format("%s:%d",
+                RandomStringUtils.randomAlphabetic(4), 
RandomUtils.nextInt(0,65536));
+        try {
+            haGroup.connectToOneCluster(randomHostUrl, clientProperties);
+            fail("Should have failed since '" + randomHostUrl + "' is not in 
HA group " + haGroup);
+        } catch (IllegalArgumentException e) {
+            LOG.info("Got expected exception with invalid host url '{}'", 
randomHostUrl, e);
+        }
+    }
+
+    /**
+     * Test that it can connect to a given cluster in this HA group after ZK 
service restarts.
+     *
+     * Method {@link HighAvailabilityGroup#connectToOneCluster(String, 
Properties)} is used by
+     * Phoenix HA framework to connect to one specific HBase cluster in this 
HA group.  The cluster
+     * may not necessarily be in ACTIVE role.  For example, parallel HA 
connection needs to connect
+     * to both clusters. This tests that it can connect to a specific ZK 
cluster after ZK restarts.
+     */
+    @Test
+    public void testConnectToOneClusterAfterZKRestart() throws Exception {
+        final String tableName = testName.getMethodName();
+        CLUSTERS.createTableOnClusterPair(tableName);
+
+        final String url1 = CLUSTERS.getUrl1();
+        final String jdbcUrlToCluster1 = "jdbc:phoenix:" + url1;
+        doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> {
+            try {
+                DriverManager.getConnection(jdbcUrlToCluster1);
+            } catch (SQLException e) {
+                LOG.info("Got expected IOException when creating Phoenix 
connection", e);
+            }
+        });
+
+        // test with plain JDBC connection after cluster restarts
+        try (Connection conn = DriverManager.getConnection(jdbcUrlToCluster1)) 
{
+            doTestBasicOperationsWithConnection(conn, tableName, null);
+        }
+        // test with HA group to get connection to one cluster
+        try (Connection conn = haGroup.connectToOneCluster(jdbcUrlToCluster1, 
clientProperties)) {
+            doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+        }
+    }
+
+    /**
+     * Test {@link HighAvailabilityGroup#isActive(PhoenixConnection)}.
+     */
+    @Test
+    public void testIsConnectionActive() throws Exception {
+        PhoenixConnection conn1 = 
haGroup.connectToOneCluster(CLUSTERS.getUrl1(), clientProperties);
+        assertTrue(haGroup.isActive(conn1));
+        PhoenixConnection conn2 = 
haGroup.connectToOneCluster(CLUSTERS.getUrl2(), clientProperties);
+        assertFalse(haGroup.isActive(conn2));
+
+        CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, 
ClusterRole.ACTIVE);
+
+        assertFalse(haGroup.isActive(conn1));
+        assertTrue(haGroup.isActive(conn2));
+    }
+
+    /**
+     * Test that when node changes, the high availability group will detect 
and issue state change.
+     */
+    @Test
+    public void testNodeChange() throws Exception {
+        assertEquals(ClusterRole.ACTIVE, 
haGroup.getRoleRecord().getRole(CLUSTERS.getUrl1()));
+        assertEquals(ClusterRole.STANDBY, 
haGroup.getRoleRecord().getRole(CLUSTERS.getUrl2()));
+
+        CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, 
ClusterRole.ACTIVE);
+
+        assertEquals(ClusterRole.STANDBY, 
haGroup.getRoleRecord().getRole(CLUSTERS.getUrl1()));
+        assertEquals(ClusterRole.ACTIVE, 
haGroup.getRoleRecord().getRole(CLUSTERS.getUrl2()));
+    }
+
+    /**
+     * Test that if STANDBY HBase cluster is down, the connect should work.
+     */
+    @Test
+    public void testCanConnectWhenStandbyHBaseClusterDown() throws Exception {
+        doTestWhenOneHBaseDown(CLUSTERS.getHBaseCluster2(), () -> {
+            // HA group is already initialized
+            Connection connection = haGroup.connect(clientProperties);
+            assertNotNull(connection);
+            assertNotNull(connection.unwrap(FailoverPhoenixConnection.class));
+        });
+    }
+
+    /**
+     * Test that if STANDBY ZK cluster is down, the connect should work.
+     *
+     * This differs from {@link #testCanConnectWhenStandbyHBaseClusterDown} 
because this stops the
+     * ZK cluster, not the HBase cluster.
+     */
+    @Test
+    public void testCanConnectWhenStandbyZKClusterDown() throws Exception {
+        doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> {
+            // Clear the HA Cache
+            HighAvailabilityGroup.CURATOR_CACHE.invalidateAll();
+
+            // HA group is already initialized
+            Connection connection = haGroup.connect(clientProperties);
+            assertNotNull(connection);
+            assertNotNull(connection.unwrap(FailoverPhoenixConnection.class));
+        });
+    }
+
+    /**
+     * Test that if STANDBY HBase cluster is down, connect to new HA group 
should work.
+     *
+     * This test covers only HBase cluster is down, and both ZK clusters are 
still healthy so
+     * clients will be able to get latest clusters role record from both 
clusters. This tests a new
+     * HA group which will get initialized during the STANDBY HBase cluster 
down time.
+     */
+    @Test
+    public void testCanConnectNewGroupWhenStandbyHBaseClusterDown() throws 
Exception {
+        doTestWhenOneHBaseDown(CLUSTERS.getHBaseCluster2(), () -> {
+            // get and initialize a new HA group when cluster2 is down
+            String haGroupName2 = testName.getMethodName() + 
RandomStringUtils.randomAlphabetic(3);
+            clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2);
+            CLUSTERS.initClusterRole(haGroupName2, 
HighAvailabilityPolicy.FAILOVER);
+            Optional<HighAvailabilityGroup> haGroup2 = Optional.empty();
+            try {
+                haGroup2 = HighAvailabilityGroup.get(jdbcUrl, 
clientProperties);
+                assertTrue(haGroup2.isPresent());
+                assertNotSame(haGroup2.get(), haGroup);
+                // get a new connection in this new HA group; should be 
pointing to ACTIVE cluster1
+                try (Connection connection = 
haGroup2.get().connect(clientProperties)) {
+                    assertNotNull(connection);
+                    
assertNotNull(connection.unwrap(FailoverPhoenixConnection.class));
+                }
+            } finally {
+                haGroup2.ifPresent(HighAvailabilityGroup::close);
+            }
+        });
+    }
+
+    /**
+     * Test that if STANDBY cluster ZK service is down, connect to new HA 
group should work.
+     *
+     * This differs from {@link 
#testCanConnectNewGroupWhenStandbyHBaseClusterDown} because this is
+     * testing scenarios when STANDBY ZK cluster is down.
+     */
+    @Test
+    public void testCanConnectNewGroupWhenStandbyZKClusterDown() throws 
Exception {
+        // get and initialize a new HA group when cluster2 is down
+        String haGroupName2 = testName.getMethodName() + 
RandomStringUtils.randomAlphabetic(3);
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2);
+        CLUSTERS.initClusterRole(haGroupName2, 
HighAvailabilityPolicy.FAILOVER);
+
+       doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> {
+           Optional<HighAvailabilityGroup> haGroup2 = Optional.empty();
+           try {
+               haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+               assertTrue(haGroup2.isPresent());
+               assertNotSame(haGroup2.get(), haGroup);
+               // get a new connection in this new HA group; should be 
pointing to ACTIVE cluster1
+               Connection connection = 
haGroup2.get().connect(clientProperties);
+               assertNotNull(connection);
+               
assertNotNull(connection.unwrap(FailoverPhoenixConnection.class));
+           } finally {
+               haGroup2.ifPresent(HighAvailabilityGroup::close);
+           }
+       });
+    }
+
+    /**
+     * Test it can not establish active connection to the ACTIVE HBase cluster 
if it is down.
+     */
+    @Test
+    public void testCanNotEstablishConnectionWhenActiveHBaseClusterDown() 
throws Exception {
+        doTestWhenOneHBaseDown(CLUSTERS.getHBaseCluster1(), () -> {
+            try {
+                haGroup.connectActive(clientProperties);
+                fail("Should have failed because ACTIVE HBase cluster is 
down.");
+            } catch (SQLException e) {
+                LOG.info("Got expected exception when ACTIVE HBase cluster is 
down", e);
+                assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), 
e.getErrorCode());
+            }
+        });
+    }
+
+     /**
+      * Test that client can not establish connection to when the ACTIVE ZK 
cluster is down,
+      * while client can establish active connection after active ZK cluster 
restarts.
+      *
+      * This differs from the {@link 
#testCanNotEstablishConnectionWhenActiveHBaseClusterDown()}
+      * because this is for ZK cluster down while the other is for HBase 
cluster down.
+      */
+    @Test
+    public void testConnectActiveWhenActiveZKClusterRestarts() throws 
Exception {
+        doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> {
+            try {
+                haGroup.connectActive(clientProperties);
+                fail("Should have failed because of ACTIVE ZK cluster is 
down.");
+            } catch (SQLException e) {
+                LOG.info("Got expected exception when ACTIVE ZK cluster is 
down", e);
+                assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), 
e.getErrorCode());
+            }
+        });
+
+        try (Connection conn = haGroup.connectActive(clientProperties)) {
+            assertNotNull(conn);
+            LOG.info("Successfully connect to HA group {} after restarting 
ACTIVE ZK", haGroup);
+        } // all other exceptions will fail the test
+    }
+
+    /**
+     * Test when one ZK starts after the HA group has been initialized.
+     *
+     * In this case, both cluster role managers will start and apply 
discovered cluster role record.
+     */
+    @Test
+    public void testOneZKStartsAfterInit() throws Exception {
+        String haGroupName2 = testName.getMethodName() + 
RandomStringUtils.randomAlphabetic(3);
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2);
+
+        // create cluster role records with different versions on two ZK 
clusters
+        final String zpath = ZKPaths.PATH_SEPARATOR + haGroupName2;
+        ClusterRoleRecord record1 = new ClusterRoleRecord(
+                haGroupName2, HighAvailabilityPolicy.FAILOVER,
+                CLUSTERS.getUrl1(), ClusterRole.ACTIVE,
+                CLUSTERS.getUrl2(), ClusterRole.STANDBY,
+                1);
+        CLUSTERS.createCurator1().create().forPath(zpath, 
ClusterRoleRecord.toJson(record1));
+        ClusterRoleRecord record2 = new ClusterRoleRecord(
+                record1.getHaGroupName(), record1.getPolicy(),
+                record1.getZk1(), record1.getRole1(),
+                record1.getZk2(), record1.getRole2(),
+                record1.getVersion() + 1); // record2 is newer
+        CLUSTERS.createCurator2().create().forPath(zpath, 
ClusterRoleRecord.toJson(record2));
+
+        doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> {
+            Optional<HighAvailabilityGroup> haGroup2 = 
HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+            assertTrue(haGroup2.isPresent());
+            assertNotSame(haGroup2.get(), haGroup);
+            // apparently the HA group cluster role record should be from the 
healthy cluster
+            assertEquals(record1, haGroup2.get().getRoleRecord());
+        });
+
+        // When ZK2 is connected, its cluster role manager should apply newer 
cluster role record
+        waitFor(() -> {
+            try {
+                Optional<HighAvailabilityGroup> haGroup2 = 
HighAvailabilityGroup.get(jdbcUrl, clientProperties);
+                return haGroup2.isPresent() && 
record2.equals(haGroup2.get().getRoleRecord());
+            } catch (SQLException e) {
+                LOG.warn("Fail to get HA group {}", haGroupName2);
+                return false;
+            }
+        }, 100, 30_000);
+
+        // clean up HA group 2
+        HighAvailabilityGroup.get(jdbcUrl, 
clientProperties).ifPresent(HighAvailabilityGroup::close);
+    }
+
+    /**
+     * Test that HA connection request will fall back to the first cluster 
when HA group fails
+     * to initialize due to missing cluster role record (CRR).
+     */
+    @Test
+    public void testFallbackToSingleConnection() throws Exception {
+        final String tableName = testName.getMethodName();
+        CLUSTERS.createTableOnClusterPair(tableName);
+
+        String haGroupName2 = testName.getMethodName() + 
RandomStringUtils.randomAlphabetic(3);
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2);
+        // no cluster role record for the HA group with name haGroupName2
+        try (Connection conn = DriverManager.getConnection(jdbcUrl, 
clientProperties)) {
+            // connection is PhoenixConnection instead of HA connection 
(failover or parallel)

Review Comment:
   Interesting (and good) fallback behavior. I'm sure it's somewhere below, but 
do we log anywhere that this fallback happened? (Seems like a WARN level sort 
of thing.) 



##########
phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java:
##########
@@ -0,0 +1,879 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
+import static org.apache.hadoop.test.GenericTestUtils.waitFor;
+import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION;
+import static 
org.apache.phoenix.jdbc.FailoverPhoenixConnection.FAILOVER_TIMEOUT_MS_ATTR;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_TRANSITION_TIMEOUT_MS_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneZKDown;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithStatement;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup;
+import static 
org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.exception.FailoverSQLException;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test failover basics for {@link FailoverPhoenixConnection}.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class FailoverPhoenixConnectionIT {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FailoverPhoenixConnectionIT.class);
+    private static final HBaseTestingUtilityPair CLUSTERS = new 
HBaseTestingUtilityPair();
+
+    @Rule
+    public final TestName testName = new TestName();
+    @Rule
+    public final Timeout globalTimeout= new Timeout(300, TimeUnit.SECONDS);
+
+    /** Client properties to create a connection per test. */
+    private Properties clientProperties;
+    /** HA group for this test. */
+    private HighAvailabilityGroup haGroup;
+    /** Table name per test case. */
+    private String tableName;
+    /** HA Group name for this test. */
+    private String haGroupName;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        CLUSTERS.start();
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+        CLUSTERS.close();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        haGroupName = testName.getMethodName();
+        clientProperties = new Properties();
+        clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName);
+        // Set some client configurations to make test run faster
+        clientProperties.setProperty(COLLECT_REQUEST_LEVEL_METRICS, 
String.valueOf(true));
+        clientProperties.setProperty(FAILOVER_TIMEOUT_MS_ATTR, "30000");
+        clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_KEY, "3");
+        clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY, 
"1000");
+        clientProperties.setProperty(ZK_SESSION_TIMEOUT, "3000");
+        clientProperties.setProperty(PHOENIX_HA_TRANSITION_TIMEOUT_MS_KEY, 
"3000");
+        clientProperties.setProperty("zookeeper.recovery.retry.maxsleeptime", 
"1000");
+
+        // Make first cluster ACTIVE
+        CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER);
+
+        haGroup = getHighAvailibilityGroup(CLUSTERS.getJdbcUrl(), 
clientProperties);
+        LOG.info("Initialized haGroup {} with URL {}", haGroup, 
CLUSTERS.getJdbcUrl());
+        tableName = testName.getMethodName().toUpperCase();
+        CLUSTERS.createTableOnClusterPair(tableName);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        try {
+            haGroup.close();
+            PhoenixDriver.INSTANCE
+                    .getConnectionQueryServices(CLUSTERS.getUrl1(), 
haGroup.getProperties())
+                    .close();
+            PhoenixDriver.INSTANCE
+                    .getConnectionQueryServices(CLUSTERS.getUrl2(), 
haGroup.getProperties())
+                    .close();
+        } catch (Exception e) {
+            LOG.error("Fail to tear down the HA group and the CQS. Will 
ignore", e);
+        }
+    }
+
+    /**
+     * Test Phoenix connection creation and basic operations with HBase 
cluster pair.
+     */
+    @Test
+    public void testOperationUsingConnection() throws Exception {
+        try (Connection conn = createFailoverConnection()) {
+            doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+        }
+    }
+
+    /**
+     * Test close() once more should not fail, as the second close should be a 
no-op.
+     */
+    @Test
+    public void testCloseConnectionOnceMore() throws Exception {
+        Connection conn = createFailoverConnection();
+        doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+        conn.close();
+        conn.close(); // this is NOT duplicate code, but instead this is 
essential for this test.
+    }
+
+    /**
+     * Tests that new Phoenix connections are not created during failover.
+     */
+    @Test
+    public void testConnectionCreationFailsIfNoActiveCluster() throws 
Exception {
+        try (Connection conn = createFailoverConnection()) {
+            doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+        }
+
+        CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, 
ClusterRole.STANDBY);
+
+        try {
+            createFailoverConnection();
+            fail("Should have failed because neither cluster is ACTIVE");
+        } catch (SQLException e) {
+            LOG.info("Got expected exception when creating new connection", e);
+            assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), 
e.getErrorCode());
+        } // all other type of exception will fail this test.
+    }
+
+    /**
+     * Tests new Phoenix connections are created if one cluster is OFFLINE and 
the other ACTIVE.
+     */
+    @Test
+    public void testConnectionOneOfflineOneActive() throws Exception {
+        CLUSTERS.transitClusterRole(haGroup, ClusterRole.OFFLINE, 
ClusterRole.ACTIVE);
+
+        try (Connection conn = createFailoverConnection()) {
+            doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+        }
+    }
+
+    /**
+     * Test that failover can finish according to the policy.
+     *
+     * In this test case, there is no existing CQS or open connections to 
close.
+     *
+     * @see #testFailoverCanFinishWhenOneZKDownWithCQS
+     */
+    @Test
+    public void testFailoverCanFinishWhenOneZKDown() throws Exception {
+        doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> {
+            // Because cluster1 is down, any version record will only be 
updated in cluster2.
+            // Become ACTIVE cluster is still ACTIVE in current version 
record, no CQS to be closed.
+            CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE, 
ClusterRole.OFFLINE);
+
+            // Usually making this cluster STANDBY will close the CQS and all 
existing connections.
+            // The HA group was created in setup() but no CQS is yet opened 
for this ACTIVE cluster.
+            // As a result there is neither the CQS nor existing opened 
connections.
+            // In this case, the failover should finish instead of failing to 
get/close the CQS.
+            CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, 
ClusterRole.ACTIVE);
+
+            try (Connection conn = createFailoverConnection()) {
+                FailoverPhoenixConnection failoverConn = 
(FailoverPhoenixConnection) conn;
+                assertEquals(CLUSTERS.getUrl2(), 
failoverConn.getWrappedConnection().getURL());
+                doTestBasicOperationsWithConnection(conn, tableName, 
haGroupName);
+            }
+        });
+    }
+
+    /**
+     * Test that policy can close bad CQS and then finish failover according 
to the policy.
+     *
+     * Internally, the failover will get the CQS for this HA group and try to 
close all connections.
+     * However, the CQS may be in a bad state since the target ZK cluster is 
down. This test is for
+     * the scenario that when CQS is in bad state, failover should finish 
transition so HA is ready.
+     *
+     * @see #testFailoverCanFinishWhenOneZKDown
+     */
+    @Test
+    public void testFailoverCanFinishWhenOneZKDownWithCQS() throws Exception {
+        doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> {
+            // Try to make a connection to current ACTIVE cluster, which 
should fail.
+            try {
+                createFailoverConnection();
+                fail("Should have failed since ACTIVE ZK '" + 
CLUSTERS.getUrl1() + "' is down");
+            } catch (SQLException e) {
+                LOG.info("Got expected exception when ACTIVE ZK cluster is 
down", e);
+            }
+
+            // As the target ZK is down, now existing CQS if any is in a bad 
state. In this case,
+            // the failover should still finish. After all, this is the most 
important use case the
+            // failover is designed for.
+            CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, 
ClusterRole.ACTIVE);
+
+            try (Connection conn = createFailoverConnection()) {
+                FailoverPhoenixConnection failoverConn = 
(FailoverPhoenixConnection) conn;
+                assertEquals(CLUSTERS.getUrl2(), 
failoverConn.getWrappedConnection().getURL());
+                doTestBasicOperationsWithConnection(conn, tableName, 
haGroupName);
+            }
+        });
+    }
+
+    /**
+     * Tests the scenario where ACTIVE ZK cluster restarts.
+     *
+     * When ACTIVE ZK cluster shutdown, client can not connect to this HA 
group.
+     * After failover, client can connect to this HA group (against the new 
ACTIVE cluster2).
+     * After restarts and fails back, client can connect to this HA group 
(against cluster 1).
+     */
+    @Test
+    public void testConnectionWhenActiveZKRestarts() throws Exception {
+        doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> {
+            try {
+                createFailoverConnection();
+                fail("Should have failed since ACTIVE ZK cluster was 
shutdown");
+            } catch (SQLException e) {
+                LOG.info("Got expected exception when ACTIVE ZK cluster is 
down", e);
+            }
+
+            // So on-call engineer comes into play, and triggers a failover
+            // Because cluster1 is down, new version record will only be 
updated in cluster2
+            CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, 
ClusterRole.ACTIVE);
+
+            // After failover, the FailoverPhoenixConnection should go to the 
second cluster
+            try (Connection conn = createFailoverConnection()) {
+                doTestBasicOperationsWithConnection(conn, tableName, 
haGroupName);
+            }
+        });
+
+        // After cluster1 comes back, new connection should still go to 
cluster2.
+        // This is the case where ZK1 believes itself is ACTIVE, with role 
record v1
+        // while ZK2 believes itself is ACTIVE, with role record v2.
+        // Client should believe ZK2 is ACTIVE in this case because high 
version wins.
+        LOG.info("Testing failover connection when both clusters are up and 
running");
+        try (Connection conn = createFailoverConnection()) {
+            FailoverPhoenixConnection failoverConn = 
conn.unwrap(FailoverPhoenixConnection.class);
+            assertEquals(CLUSTERS.getUrl2(), 
failoverConn.getWrappedConnection().getURL());
+            doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+        }
+
+        LOG.info("Testing failover back to cluster1 when bot clusters are up 
and running");
+        // Failover back to the first cluster since it is healthy after restart
+        CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE, 
ClusterRole.STANDBY);
+        // After ACTIVE ZK restarts and fail back, this should still work
+        try (Connection conn = createFailoverConnection()) {
+            doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+        }
+    }
+
+    /**
+     * Tests the scenario where STANDBY ZK cluster restarts.
+     *
+     * When STANDBY ZK cluster shutdown, client can still connect to this HA 
group.
+     * After failover, client can not connect to this HA group (ACTIVE cluster 
 2 is down).
+     * After cluster 2 (ACTIVE) restarts, client can connect to this HA group.
+     */
+    @Test
+    public void testConnectionWhenStandbyZKRestarts() throws Exception {
+        doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> {
+            try (Connection conn = createFailoverConnection()) {
+                doTestBasicOperationsWithConnection(conn, tableName, 
haGroupName);
+            }
+
+            // Innocent on-call engineer triggers a failover to second cluster
+            CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, 
ClusterRole.ACTIVE);
+
+            try {
+                createFailoverConnection();
+                fail("Should have failed since ACTIVE ZK cluster was 
shutdown");
+            } catch (SQLException e) {
+                LOG.info("Got expected exception when ACTIVE ZK cluster {} was 
shutdown",
+                        CLUSTERS.getUrl2(), e);
+            }
+        });
+
+        // After the second cluster (ACTIVE) ZK restarts, this should still 
work
+        try (Connection conn = createFailoverConnection()) {
+            doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+        }
+    }
+
+    /**
+     * Tests the scenario where two ZK clusters both restart.
+     *
+     * Client can not connect to this HA group when two ZK clusters are both 
down.
+     * When STANDBY ZK cluster2 first restarts, client still can not connect 
to this HA group.
+     * After failover, client can connect to this HA group because cluster 2 
is ACTIVE and healthy.
+     */
+    @Test
+    public void testConnectionWhenTwoZKRestarts() throws Exception {
+      doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> {
+          doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> {
+              try {
+                  createFailoverConnection();
+                  fail("Should have failed since ACTIVE ZK cluster was 
shutdown");
+              } catch (SQLException e) {
+                  LOG.info("Got expected exception when both clusters are 
down", e);
+              }
+          });
+
+          // Client cannot connect to HA group because cluster 2 is still 
STANDBY after restarted
+          try {
+              createFailoverConnection();
+              fail("Should have failed since ACTIVE ZK cluster was shutdown");
+          } catch (SQLException e) {
+              LOG.info("Got expected exception when ACTIVE ZK cluster {} was 
shutdown",
+                      CLUSTERS.getUrl2(), e);
+          }
+
+          // So on-call engineer comes into play, and triggers a failover
+          CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, 
ClusterRole.ACTIVE);
+
+          // After the second cluster (ACTIVE) ZK restarts, this should still 
work
+          try (Connection conn = createFailoverConnection()) {
+              doTestBasicOperationsWithConnection(conn, tableName, 
haGroupName);
+          }
+      });
+    }
+
+    /**
+     * Tests that new Phoenix connections are not created if both clusters are 
OFFLINE.
+     */
+    @Test
+    public void testConnectionCreationFailsIfBothClustersOffline() throws 
Exception {
+        CLUSTERS.transitClusterRole(haGroup, ClusterRole.OFFLINE, 
ClusterRole.OFFLINE);
+
+        try {
+            createFailoverConnection();
+            fail("Should have failed because both clusters are OFFLINE");
+        } catch (SQLException e) {
+            LOG.info("Got expected exception when creating new connection", e);
+            assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), 
e.getErrorCode());
+        } // all other type of exception will fail this test.
+    }
+
+    /**
+     * Tests that existing wrapped Phoenix connection is closed in the 
Failover event.
+     */
+    @Test
+    public void testWrappedConnectionClosedAfterStandby() throws Exception {
+        Connection conn = createFailoverConnection();
+        doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+
+        CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, 
ClusterRole.ACTIVE);
+
+        // The wrapped connection is still against the first cluster, and is 
closed
+        PhoenixConnection pc = 
((FailoverPhoenixConnection)conn).getWrappedConnection();
+        assertNotNull(pc);
+        assertEquals(CLUSTERS.getUrl1(), pc.getURL());
+        assertTrue(pc.isClosed());
+        doTestActionShouldFailBecauseOfFailover(conn::createStatement);
+    }
+
+    /**
+     * Tests that existing Phoenix statement is closed when cluster transits 
into STANDBY.
+     */
+    @Test
+    public void testStatementClosedAfterStandby() throws Exception {
+        Connection conn = createFailoverConnection();
+        Statement stmt = conn.createStatement();
+        doTestBasicOperationsWithStatement(conn, stmt, tableName);
+
+        CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, 
ClusterRole.ACTIVE);
+
+        assertFalse(conn.isClosed());
+        assertTrue(stmt.isClosed());
+        doTestActionShouldFailBecauseOfFailover(
+                () -> stmt.executeQuery("SELECT * FROM " + tableName));
+    }
+
+    /**
+     * Tests non-HA connection (vanilla Phoenix connection) is intact when 
cluster role transits.
+     *
+     * The reason is that, high availability group has its own CQSI which 
tracks only those Phoenix
+     * connections that are wrapped by failover connections.
+     */
+    @Test
+    public void testNonHAConnectionNotClosedAfterFailover() throws Exception {
+        String firstUrl = String.format("jdbc:phoenix:%s", CLUSTERS.getUrl1());
+        // This is a vanilla Phoenix connection without using high 
availability (HA) feature.
+        Connection phoenixConn = DriverManager.getConnection(firstUrl, new 
Properties());
+        Connection failoverConn = createFailoverConnection();
+        PhoenixConnection wrappedConn = ((FailoverPhoenixConnection) 
failoverConn)
+                .getWrappedConnection();
+
+        assertFalse(phoenixConn.isClosed());
+        assertFalse(failoverConn.isClosed());
+        assertFalse(wrappedConn.isClosed());
+
+        CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, 
ClusterRole.ACTIVE);
+
+        assertFalse(phoenixConn.isClosed()); // normal Phoenix connection is 
not closed
+        assertFalse(failoverConn.isClosed()); // failover connection is not 
closed by close() method
+        assertTrue(wrappedConn.isClosed());
+    }
+
+    /**
+     * Tests that one HA group cluster role transit will not affect 
connections in other HA groups.
+     */
+    @Test
+    public void testOtherHAGroupConnectionUnchanged() throws Exception {
+        Connection conn = createFailoverConnection();
+        PhoenixConnection wrappedConn = ((FailoverPhoenixConnection) 
conn).getWrappedConnection();
+        // Following we create a new HA group and create a connection against 
this HA group
+        String haGroupName2 = haGroup.getGroupInfo().getName() + "2";
+        CLUSTERS.initClusterRole(haGroupName2, 
HighAvailabilityPolicy.FAILOVER);
+        Properties clientProperties2 = new Properties(clientProperties);
+        clientProperties2.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2);
+        Connection conn2 = DriverManager.getConnection(CLUSTERS.getJdbcUrl(), 
clientProperties2);
+        PhoenixConnection wrappedConn2 = ((FailoverPhoenixConnection) 
conn2).getWrappedConnection();
+
+        assertFalse(wrappedConn.isClosed());
+        assertFalse(wrappedConn2.isClosed());
+
+        CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, 
ClusterRole.ACTIVE);
+
+        assertTrue(wrappedConn.isClosed());
+        assertFalse(wrappedConn2.isClosed());
+    }
+
+    /**
+     * Test that failover can finish even if one connection can not be closed.
+     *
+     * When once cluster becomes STANDBY from ACTIVE, all its connections and 
the associated CQS
+     * will get closed asynchronously. In case of errors when closing those 
connections and CQS,
+     * the HA group is still able to transit to target state after the maximum 
timeout.
+     * Closing the existing connections is guaranteed with best effort and 
timeout in favor of
+     * improved availability.
+     *
+     * @see #testFailoverTwice which fails over back to the first cluster
+     */
+    @Test
+    public void testFailoverCanFinishWhenOneConnectionGotStuckClosing() throws 
Exception {
+        Connection conn = createFailoverConnection();
+        doTestBasicOperationsWithConnection(conn, tableName, haGroupName);
+        assertEquals(CLUSTERS.getUrl1(),  // active connection is against the 
first cluster
+                
conn.unwrap(FailoverPhoenixConnection.class).getWrappedConnection().getURL());
+
+        // Spy the wrapped connection
+        Connection wrapped = 
conn.unwrap(FailoverPhoenixConnection.class).getWrappedConnection();
+        Connection spy = Mockito.spy(wrapped);
+        final CountDownLatch latch = new CountDownLatch(1);
+        // Make close() stuck before closing
+        doAnswer((invocation) -> {
+            latch.await();
+            invocation.callRealMethod();
+            return null;
+        }).when(spy).close();
+        ConnectionQueryServices cqs = PhoenixDriver.INSTANCE
+                .getConnectionQueryServices(CLUSTERS.getUrl1(), 
clientProperties);
+        // replace the wrapped connection with the spied connection in CQS
+        cqs.removeConnection(wrapped.unwrap(PhoenixConnection.class));
+        cqs.addConnection(spy.unwrap(PhoenixConnection.class));
+
+        // (ACTIVE, STANDBY) -> (STANDBY, ACTIVE)
+        // The transition will finish as we set 
PHOENIX_HA_TRANSITION_TIMEOUT_MS_KEY for this class
+        // even though the spied connection is stuck at the latch when closing
+        CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, 
ClusterRole.ACTIVE);
+
+        // Verify the spied object has been called once
+        verify(spy, times(1)).close();
+        // The spy is not closed because the real method was blocked by latch
+        assertFalse(spy.isClosed());
+        // connection is not closed as Phoenix HA does not close failover 
connections.
+        assertFalse(conn.isClosed());
+
+        try (Connection conn2 = createFailoverConnection()) {
+            doTestBasicOperationsWithConnection(conn2, tableName, haGroupName);
+            assertEquals(CLUSTERS.getUrl2(), // active connection is against 
the second cluster
+                    
conn2.unwrap(FailoverPhoenixConnection.class).getWrappedConnection().getURL());
+        }
+
+        latch.countDown();
+        conn.close();
+        // The CQS should be closed eventually.
+        waitFor(() -> {
+            try {
+                ((ConnectionQueryServicesImpl) cqs).checkClosed();
+                return false;
+            } catch (IllegalStateException e) {
+                LOG.info("CQS got closed as we get expected exception.", e);
+                return true;
+            }
+        }, 100, 10_000);
+    }
+
+    /**
+     * This is to make sure all Phoenix connections are closed when cluster 
becomes STANDBY.
+     *
+     * Test with many connections.
+     */
+    @Test
+    public void testAllWrappedConnectionsClosedAfterStandby() throws Exception 
{
+        short numberOfConnections = 10;
+        List<Connection> connectionList = new ArrayList<>(numberOfConnections);
+        for (short i = 0; i < numberOfConnections; i++) {
+            connectionList.add(createFailoverConnection());
+        }
+
+        CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, 
ClusterRole.ACTIVE);
+
+        for (short i = 0; i < numberOfConnections; i++) {
+            LOG.info("Asserting connection number {}", i);
+            FailoverPhoenixConnection conn = ((FailoverPhoenixConnection) 
connectionList.get(i));
+            assertFalse(conn.isClosed());
+            assertTrue(conn.getWrappedConnection().isClosed());
+        }
+    }
+
+    /**
+     * This is to make sure all Phoenix connections are closed when cluster 
becomes STANDBY.
+     *
+     * Test with many connections.
+     */
+    @Test
+    public void testAllWrappedConnectionsClosedAfterStandbyAsync() throws 
Exception {
+        short numberOfThreads = 10;
+        // Test thread waits for half of connections to be created before 
triggering a failover
+        CountDownLatch latchToTransitRole = new CountDownLatch(numberOfThreads 
/  2);
+        // Clients wait for failover to finish before creating more connections
+        CountDownLatch latchToCreateMoreConnections = new CountDownLatch(1);
+        ExecutorService executor = 
Executors.newFixedThreadPool(numberOfThreads);
+        List<Future<Connection>> connections = new 
ArrayList<>(numberOfThreads);
+        for (short i = 0; i < numberOfThreads; i++) {
+            Future<Connection> future = executor.submit(() -> {
+                if (latchToTransitRole.getCount() <= 0) {
+                    latchToCreateMoreConnections.await();
+                }
+                Connection conn = createFailoverConnection();
+                latchToTransitRole.countDown();
+                return conn;
+            });
+            connections.add(future);
+        }
+
+        latchToTransitRole.await();
+        CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, 
ClusterRole.STANDBY);
+        latchToCreateMoreConnections.countDown();
+
+        waitFor(() -> {
+            for (Future<Connection> future : connections) {
+                if (!future.isDone()) {
+                    return false;
+                }
+                try {
+                    Connection conn = future.get(100, TimeUnit.MILLISECONDS);
+                    FailoverPhoenixConnection failoverConn = 
(FailoverPhoenixConnection) conn;
+                    if (!failoverConn.getWrappedConnection().isClosed()) {
+                        return false;
+                    }
+                } catch (Exception e) {
+                    LOG.info("Got exception when getting client connection; 
ignored", e);
+                }
+            }
+            return true;
+        }, 100, 60_000);
+    }
+
+    /**
+     * Test all Phoenix connections are closed when ZK is down and its role 
becomes STANDBY.
+     *
+     * This tests with many connections, as {@link 
#testAllWrappedConnectionsClosedAfterStandby()}.
+     * The difference is that, the ACTIVE cluster first shuts down and hence

Review Comment:
   nit: and hence? (Comment seems to end in mid-sentence)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to