gjacoby126 commented on code in PR #1430:
URL: https://github.com/apache/phoenix/pull/1430#discussion_r882048420


##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/FailoverPolicy.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.sql.Connection;
+import java.util.Properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.http.annotation.Immutable;
+import org.apache.phoenix.exception.FailoverSQLException;
+
+/**
+ * A failover policy defines how failover connection deals with existing 
connections in case of
+ * cluster role transition is detected.
+ *
+ * When an HBase cluster is not in ACTIVE role any more, all connections 
against it will get closed.
+ * To handle a failover event, the failover connection will use the failover 
policy for taking
+ * further actions. Those supported failover policies are defined here, but in 
future we can load
+ * the policy implemented and configured by user at runtime for each 
connection. The default policy
+ * requires that clients have to deal with the failover exception explicitly.
+ */
+@Immutable
+@FunctionalInterface
+public interface FailoverPolicy {
+    String PHOENIX_HA_FAILOVER_POLICY_ATTR = "phoenix.ha.failover.policy";
+    String PHOENIX_HA_FAILOVER_COUNT_ATTR = "phoenix.ha.failover.count";
+
+    /**
+     * Should try to failover by connecting to current ACTIVE HBase cluster 
(if any).
+     *
+     * @param exception the exception caught upon which this method is 
possible called
+     * @param failoverCount how many time so far this failover has been 
attempted
+     * @return true if caller should get a new phoenix connection against the 
ACTIVE HBase cluster
+     */
+    boolean shouldFailover(Exception exception, int failoverCount);
+
+    /**
+     * With this policy, clients have to deal with the failover exception 
explicitly if any.
+     *
+     * A {@link FailoverSQLException} exception will be thrown to the client 
when they try to use
+     * the closed connections. Specially, the high availability (HA) framework 
will not connect to
+     * the new ACTIVE cluster automatically for clients, but instead a client 
should:
+     *  - re-connect to this HA group, in which case it will get a new 
connection wrapping a Phoenix
+     *    connection to the newly ACTIVE cluster; OR
+     *  - call static method {@link 
FailoverPhoenixConnection#failover(Connection,long)} explicitly.
+     *    After that, it can create new Statement/ResultSet and retry the 
business logic.
+     * If neither cluster is ACTIVE, connect requests to this HA group will 
keep getting exception.
+     */
+    class ExplicitFailoverPolicy implements FailoverPolicy {
+        public static final String NAME = "explicit";
+        private static final ExplicitFailoverPolicy INSTANCE = new 
ExplicitFailoverPolicy();
+
+        @Override
+        public boolean shouldFailover(Exception e, int failoverCount) {
+            return false;
+        }
+
+        @Override
+        public String toString() {
+            return NAME;
+        }
+    }
+
+    /**
+     * With this, failover connection will wrap a new Phoenix connection to 
the new ACTIVE cluster.
+     *
+     * If the current operation (e.g. commit or create Statement) fails, the 
failover connection
+     * will try to wrap a new Phoenix connection according to this policy.  
After that, the client
+     * will be able to create new Statement/ResultSet created against this 
failover connection.
+     * While the HA group is failing over, the failover connection may not be 
able to failover by
+     * wrapping a new phoenix connection.  In that case, clients trying to use 
this the failover
+     * connection will get {@link FailoverSQLException} exception.
+     *
+     * The failover to ACTIVE cluster is best-effort; if it succeeds, clients 
do not notice target
+     * cluster changed. Some cases are not yet well supported with this 
failover policy, for e.g.
+     * after failover, the uncommitted mutations are not populated into the 
new connection.
+     *
+     * In case of {@link FailoverSQLException} exception, clients can still 
re-connect to this HA
+     * group by creating a new failover connection, OR call static method 
failover() explicitly.
+     */
+    @InterfaceStability.Unstable

Review Comment:
   It's pretty rare that Phoenix uses either InterfaceAudience or 
InterfaceStability. No objection to this being here, just FYI. 



##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java:
##########
@@ -0,0 +1,646 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.phoenix.exception.FailoverSQLException;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.monitoring.MetricType;
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+/**
+ * An implementation of JDBC connection which supports failover between two 
cluster in an HA group.
+ * <p>
+ * During its lifetime, a failover Phoenix connection could possibly connect 
to two HBase clusters
+ * in an HA group mutually exclusively.  It wraps and delegates the logic to a 
PhoenixConnection
+ * object.  At any given time, the wrapped connection should only talk to the 
ACTIVE HBase cluster
+ * in the HA group.
+ * <p>
+ * A failover connection will behave according to the given failover policy 
upon cluster role
+ * failover, especially when the current connected HBase cluster becomes 
STANDBY role from ACTIVE.
+ * The default behavior (aka default failover policy) will simply close the 
current connection and
+ * throw {@link org.apache.phoenix.exception.FailoverSQLException} exception 
to those clients who
+ * still use this connection after closing.
+ * <p>
+ * This class is not thread safe.
+ *
+ * @see HighAvailabilityGroup
+ * @see FailoverPolicy
+ */
+public class FailoverPhoenixConnection implements PhoenixMonitoredConnection {
+    /**
+     * Failover timeout interval after which failover operation will fail and 
clients can retry.
+     */
+    public static final String FAILOVER_TIMEOUT_MS_ATTR = 
"phoenix.ha.failover.timeout.ms";
+    public static final long FAILOVER_TIMEOUT_MS_DEFAULT = 10_000;
+    private static final Logger LOG = 
LoggerFactory.getLogger(FailoverPhoenixConnection.class);
+    /**
+     * Connection properties.
+     */
+    private final Properties properties;
+    /**
+     * High availability group.
+     */
+    private final HighAvailabilityGroup haGroup;
+    /**
+     * Failover policy, per connection.
+     */
+    private final FailoverPolicy policy;
+
+    /**
+     * True iff this connection has been closed by the client.
+     */
+    private boolean isClosed;
+    /**
+     * The wrapped PhoenixConnection object which could be re-assigned upon 
failover operation.
+     */
+    private PhoenixConnection connection;
+
+    /**
+     * Mutation metrics before failover to current connection.
+     */
+    private Map<String, Map<MetricType, Long>> previousMutationMetrics = new 
HashMap<>();
+    /**
+     * Read metrics before failover to current connection.
+     */
+    private Map<String, Map<MetricType, Long>> previousReadMetrics = new 
HashMap<>();
+
+    public FailoverPhoenixConnection(HighAvailabilityGroup haGroup, Properties 
properties)
+            throws SQLException {
+        this.properties = properties;
+        this.haGroup = haGroup;
+        this.policy = FailoverPolicy.get(properties);
+        this.isClosed = false;
+        this.connection = haGroup.connectActive(properties);
+    }
+
+    /**
+     * This is used for explicit failover request made by client.
+     * <p>
+     * It fails over to the current ACTIVE HBase cluster; if failover happens 
in between, this could
+     * possibly target this same cluster again.
+     * <p>
+     * TODO: If there are two ACTIVE clusters, then switch to the other ACTIVE 
cluster.

Review Comment:
   I assume this TODO is ok to defer?



##########
phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java:
##########
@@ -165,8 +165,30 @@ public enum MetricType {
     NUM_METADATA_LOOKUP_FAILURES("nmlf", "Number of Failed  metadata lookup 
calls",
                                  LogLevel.DEBUG,PLong.INSTANCE),
     TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS("tsistrc", "Time spent in RPC calls 
for systemTable lookup",
-                                         LogLevel.DEBUG,PLong.INSTANCE);
-       
+                                         LogLevel.DEBUG,PLong.INSTANCE),
+
+    //HA Related Metrics
+    HA_PARALLEL_COUNT_OPERATIONS_ACTIVE_CLUSTER("hpoac","Number of Operations 
to the active cluster",LogLevel.DEBUG,PLong.INSTANCE),
+    HA_PARALLEL_COUNT_OPERATIONS_STANDBY_CLUSTER("hposc","Number of Operations 
to the active cluster",LogLevel.DEBUG,PLong.INSTANCE),
+    HA_PARALLEL_COUNT_FAILED_OPERATIONS_ACTIVE_CLUSTER("hpfac","Number of 
Operations to the active cluster",LogLevel.DEBUG,PLong.INSTANCE),
+    HA_PARALLEL_COUNT_FAILED_OPERATIONS_STANDBY_CLUSTER("hpfsc","Number of 
Operations to the active cluster",LogLevel.DEBUG,PLong.INSTANCE),

Review Comment:
   nit: here again, description of standby metric says active



##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java:
##########
@@ -0,0 +1,653 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.ParallelPhoenixUtil.FutureResult;
+import org.apache.phoenix.monitoring.MetricType;
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+import static 
org.apache.phoenix.exception.SQLExceptionCode.CLASS_NOT_UNWRAPPABLE;
+
+public class ParallelPhoenixConnection implements PhoenixMonitoredConnection {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ParallelPhoenixConnection.class);
+
+    private final ParallelPhoenixContext context;
+    CompletableFuture<PhoenixConnection> futureConnection1;
+    CompletableFuture<PhoenixConnection> futureConnection2;
+
+    public ParallelPhoenixConnection(ParallelPhoenixContext context) throws 
SQLException {
+        this.context = context;
+        LOG.debug("First Url: {} Second Url: {}", 
context.getHaGroup().getGroupInfo().getUrl1(),
+                context.getHaGroup().getGroupInfo().getUrl2());
+        futureConnection1 = context.chainOnConn1(() -> 
getConnection(context.getHaGroup(),
+                context.getHaGroup().getGroupInfo().getUrl1(),
+                context.getProperties()));
+        futureConnection2 = context.chainOnConn2(() -> 
getConnection(context.getHaGroup(),
+                context.getHaGroup().getGroupInfo().getUrl2(),
+                context.getProperties()));
+    }
+
+    @VisibleForTesting
+    ParallelPhoenixConnection(ParallelPhoenixContext context, 
CompletableFuture<PhoenixConnection> futureConnection1, 
CompletableFuture<PhoenixConnection> futureConnection2) {
+        this.context = context;
+        this.futureConnection1 = futureConnection1;
+        this.futureConnection2 = futureConnection2;
+    }
+
+    private static PhoenixConnection getConnection(HighAvailabilityGroup 
haGroup, String url, Properties properties) {
+        try {
+            return haGroup.connectToOneCluster(url, properties);
+        } catch (SQLException exception) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace(String.format("Failed to get a connection for 
haGroup %s to %s", haGroup.toString(), url), exception);
+            }
+            throw new CompletionException(exception);
+        }
+    }
+
+    public CompletableFuture<PhoenixConnection> getFutureConnection1() {
+        return futureConnection1;
+    }
+
+    public CompletableFuture<PhoenixConnection> getFutureConnection2() {
+        return futureConnection2;
+    }
+
+    @VisibleForTesting
+    ParallelPhoenixContext getContext() {
+        return this.context;
+    }
+
+    Object runOnConnections(Function<PhoenixConnection, ?> function, boolean 
useMetrics) throws SQLException {
+        return ParallelPhoenixUtil.INSTANCE.runFutures(function, 
futureConnection1, futureConnection2, context, useMetrics);
+    }
+
+    PairOfSameType<Object> runOnConnectionsGetAll(Function<PhoenixConnection, 
?> function, boolean useMetrics) throws SQLException {
+        return ParallelPhoenixUtil.INSTANCE.runOnFuturesGetAll(function, 
futureConnection1, futureConnection2, context, useMetrics);
+    }
+
+    @Override
+    public ParallelPhoenixStatement createStatement() throws SQLException {
+        context.checkOpen();
+
+        Function<PhoenixConnection, PhoenixMonitoredStatement> function = (T) 
-> {
+            try {
+                return (PhoenixStatement) T.createStatement();
+            } catch (SQLException exception) {
+                throw new CompletionException(exception);
+            }
+        };
+
+        List<CompletableFuture<PhoenixMonitoredStatement>> futures = 
ParallelPhoenixUtil.INSTANCE.applyFunctionToFutures(function, futureConnection1,
+                futureConnection2, context, true);
+
+        Preconditions.checkState(futures.size() == 2);
+        CompletableFuture<PhoenixMonitoredStatement> statement1 = 
futures.get(0);
+        CompletableFuture<PhoenixMonitoredStatement> statement2 = 
futures.get(1);
+
+        //Ensure one statement is successful before returning
+        ParallelPhoenixUtil.INSTANCE.runFutures(futures, context, true);
+
+        return new ParallelPhoenixStatement(context, statement1, statement2);
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql) throws SQLException {
+        context.checkOpen();
+
+        Function<PhoenixConnection, PhoenixMonitoredPreparedStatement> 
function = (T) -> {
+            try {
+                return (PhoenixMonitoredPreparedStatement) 
T.prepareStatement(sql);
+            } catch (SQLException exception) {
+                throw new CompletionException(exception);
+            }
+        };
+
+        List<CompletableFuture<PhoenixMonitoredPreparedStatement>> futures = 
ParallelPhoenixUtil.INSTANCE.applyFunctionToFutures(function, futureConnection1,
+                futureConnection2, context, true);
+
+        Preconditions.checkState(futures.size() == 2);
+        CompletableFuture<PhoenixMonitoredPreparedStatement> statement1 = 
futures.get(0);
+        CompletableFuture<PhoenixMonitoredPreparedStatement> statement2 = 
futures.get(1);
+
+        //Ensure one statement is successful before returning
+        ParallelPhoenixUtil.INSTANCE.runFutures(futures, context, true);
+
+        return new ParallelPhoenixPreparedStatement(this.context, statement1, 
statement2);
+    }
+
+    @Override
+    public CallableStatement prepareCall(String sql) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public String nativeSQL(String sql) throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean getAutoCommit() throws SQLException {
+        Function<PhoenixConnection, Boolean> function = (T) -> {
+            try {
+                return T.getAutoCommit();
+            } catch (SQLException exception) {
+                throw new CompletionException(exception);
+            }
+        };
+
+        return (boolean) runOnConnections(function, true);
+    }
+
+    @Override
+    public void setAutoCommit(boolean autoCommit) throws SQLException {
+        Function<PhoenixConnection, Void> function = (T) -> {
+            try {
+                T.setAutoCommit(autoCommit);
+                return null;
+            } catch (SQLException exception) {
+                throw new CompletionException(exception);
+            }
+        };
+
+        runOnConnections(function, true);
+    }
+
+    @Override
+    public void commit() throws SQLException {
+        Function<PhoenixConnection, Void> function = (T) -> {
+            try {
+                T.commit();
+                return null;
+            } catch (SQLException exception) {
+                throw new CompletionException(exception);
+            }
+        };
+
+        runOnConnections(function, true);
+    }
+
+    @Override
+    public void rollback() throws SQLException {
+
+    }
+
+    /**
+     * Close the underlying connections. Returns after any one of the 
underlying connections have
+     * closed successfully
+     *
+     * @throws SQLException if trying to close both the underlying connections 
encounters errors
+     */
+    @Override
+    public void close() throws SQLException {
+        context.close();
+        SQLException closeExp = null;
+
+        Supplier<Boolean> closeSupplier1 = getCloseSupplier(futureConnection1);
+        Supplier<Boolean> closeSupplier2 = getCloseSupplier(futureConnection2);
+
+        // We can have errors on the chain, we still need to close the 
underlying connections
+        // irrespective. Do a close when we're at the end of the chain and 
wait for any 1 to be
+        // successful. We need to track which future completed hence use 
FutureResult
+        // Enqueue a close operation at the end of the chain in the common 
ForkJoin Pool
+        CompletableFuture<FutureResult<CompletableFuture<Boolean>>> 
closeFuture1 =
+                context.getChainOnConn1().handle((obj, e) -> {
+                    return CompletableFuture.supplyAsync(closeSupplier1);
+                }).thenApply(t -> new FutureResult<>(t, 0));
+        CompletableFuture<FutureResult<CompletableFuture<Boolean>>> 
closeFuture2 =
+                context.getChainOnConn2().handle((obj, e) -> {
+                    return CompletableFuture.supplyAsync(closeSupplier2);
+                }).thenApply(t -> new FutureResult<>(t, 1));
+
+        FutureResult<CompletableFuture<Boolean>> result =
+                (FutureResult<CompletableFuture<Boolean>>) 
ParallelPhoenixUtil.INSTANCE
+                        .getAnyOfNonExceptionally(Arrays.asList(closeFuture1, 
closeFuture2), context);
+
+        try {
+            ParallelPhoenixUtil.INSTANCE.getFutureNoRetry(result.getResult(), 
context);
+            return;
+        } catch (Exception e) {
+            closeExp = new SQLException(e);
+        }
+        // The previous close encountered an exception try the other one
+        CompletableFuture<FutureResult<CompletableFuture<Boolean>>> 
otherFuture =
+                (result.getIndex() == 0) ? closeFuture2 : closeFuture1;
+
+        try {
+            FutureResult<CompletableFuture<Boolean>> otherResult =

Review Comment:
   Do we risk leaking connections if one connection closes but the other 
doesn't? Is there ever another opportunity to clean it up?



##########
phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixPreparedStatementTest.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.jdbc;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class ParallelPhoenixPreparedStatementTest {
+
+    ParallelPhoenixContext context;
+    CompletableFuture<PhoenixMonitoredPreparedStatement> future1;
+    CompletableFuture<PhoenixMonitoredPreparedStatement> future2;
+    PhoenixMonitoredPreparedStatement statement1;
+    PhoenixMonitoredPreparedStatement statement2;
+
+
+    ParallelPhoenixPreparedStatement phoenixPreparedStatement;
+
+    @Before
+    public void init() throws Exception {
+        context = new ParallelPhoenixContext(new Properties(), 
Mockito.mock(HighAvailabilityGroup.class),
+            
HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null);
+
+        statement1 = Mockito.mock(PhoenixMonitoredPreparedStatement.class);
+        statement2 = Mockito.mock(PhoenixMonitoredPreparedStatement.class);
+
+        future1 = CompletableFuture.completedFuture(statement1);
+        future2 = CompletableFuture.completedFuture(statement2);
+
+        phoenixPreparedStatement = new 
ParallelPhoenixPreparedStatement(context,future1,future2);
+    }
+
+    @Test
+    public void getStatement1() throws SQLException {
+        future1 = Mockito.mock(CompletableFuture.class);
+        future2 = Mockito.mock(CompletableFuture.class);
+        phoenixPreparedStatement = new 
ParallelPhoenixPreparedStatement(context,future1,future2);
+        assertEquals(future1, phoenixPreparedStatement.getStatement1());
+    }
+
+    @Test
+    public void getStatement2() throws SQLException {
+        future1 = Mockito.mock(CompletableFuture.class);
+        future2 = Mockito.mock(CompletableFuture.class);
+        phoenixPreparedStatement = new 
ParallelPhoenixPreparedStatement(context,future1,future2);
+        assertEquals(future2, phoenixPreparedStatement.getStatement2());
+    }
+
+    @Test
+    public void executeQuery() throws SQLException, ExecutionException, 
InterruptedException {
+        ResultSet mockResultSet1 = Mockito.mock(ResultSet.class);
+        ResultSet mockResultSet2 = Mockito.mock(ResultSet.class);
+
+        Mockito.when(statement1.executeQuery()).thenReturn(mockResultSet1);
+        Mockito.when(statement2.executeQuery()).thenReturn(mockResultSet2);
+
+        ResultSet rs = phoenixPreparedStatement.executeQuery();
+
+        //TODO: make this less dependant on sleep
+        Thread.sleep(5000);
+
+        Mockito.verify(statement1).executeQuery();
+        Mockito.verify(statement2).executeQuery();
+        ParallelPhoenixResultSet parallelRS = (ParallelPhoenixResultSet) rs;
+        assertEquals(mockResultSet1,parallelRS.getResultSetFuture1().get());
+        assertEquals(mockResultSet2,parallelRS.getResultSetFuture2().get());
+    }
+
+    @Test
+    public void setInt() throws SQLException, ExecutionException, 
InterruptedException {
+        phoenixPreparedStatement.setInt(1,2);
+
+        //TODO: make this less dependant on sleep
+        Thread.sleep(5000);

Review Comment:
   setInt is asynchronous?



##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.jdbc;
+
+import static org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole.ACTIVE;
+import static org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole.STANDBY;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An HighAvailabilityGroup provides a JDBC connection from given connection 
string and properties.
+ */
+enum HighAvailabilityPolicy {
+    FAILOVER {
+        @Override
+        public Connection provide(HighAvailabilityGroup haGroup, Properties 
info)
+                throws SQLException {
+            return new FailoverPhoenixConnection(haGroup, info);
+        }
+        @Override
+        void transitClusterRole(HighAvailabilityGroup haGroup, 
ClusterRoleRecord oldRecord,
+                ClusterRoleRecord newRecord) throws SQLException {
+            if (oldRecord.getRole1() == ACTIVE && newRecord.getRole1() == 
STANDBY) {
+                transitStandby(haGroup, oldRecord.getZk1());
+            }
+            if (oldRecord.getRole2() == ACTIVE && newRecord.getRole2() == 
STANDBY) {
+                transitStandby(haGroup, oldRecord.getZk2());
+            }
+            if (oldRecord.getRole1() != ACTIVE && newRecord.getRole1() == 
ACTIVE) {
+                transitActive(haGroup, oldRecord.getZk1());
+            }
+            if (oldRecord.getRole2() != ACTIVE && newRecord.getRole2() == 
ACTIVE) {
+                transitActive(haGroup, oldRecord.getZk2());
+            }
+        }
+        private void transitStandby(HighAvailabilityGroup haGroup, String 
zkUrl)
+                throws SQLException {
+            // Close connections when a previously ACTIVE HBase cluster 
becomes STANDBY.
+            LOG.info("Cluster {} becomes STANDBY in HA group {}, now close all 
its connections",
+                    zkUrl, haGroup.getGroupInfo());
+            ConnectionQueryServices cqs = null;
+            try {
+                cqs = PhoenixDriver.INSTANCE.getConnectionQueryServices(zkUrl,
+                         haGroup.getProperties());
+                cqs.closeAllConnections(new SQLExceptionInfo
+                        .Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER)
+                        .setMessage("Phoenix connection got closed due to 
failover")
+                        .setHaGroupInfo(haGroup.getGroupInfo().toString()));
+                LOG.info("Closed all connections to cluster {} for HA group 
{}", zkUrl,
+                        haGroup.getGroupInfo());
+            } finally {
+                if (cqs != null) {
+                    // CQS is closed but it is not invalidated from global 
cache in PhoenixDriver
+                    // so that any new connection will get error instead of 
creating a new CQS
+                    LOG.info("Closing CQS after cluster '{}' becomes STANDBY", 
zkUrl);
+                    cqs.close();
+                    LOG.info("Successfully closed CQS after cluster '{}' 
becomes STANDBY", zkUrl);
+                }
+            }
+        }
+        private void transitActive(HighAvailabilityGroup haGroup, String zkUrl)
+                throws SQLException {
+            // Invalidate CQS cache if any that has been closed but has not 
been cleared
+            PhoenixDriver.INSTANCE.invalidateCache(zkUrl, 
haGroup.getProperties());
+        }
+    },
+
+    PARALLEL {
+        @Override
+        public Connection provide(HighAvailabilityGroup haGroup, Properties 
info)
+                throws SQLException {
+            List<Boolean> executorCapacities = 
PhoenixHAExecutorServiceProvider.hasCapacity(info);
+            if (executorCapacities.contains(Boolean.TRUE)) {
+                ParallelPhoenixContext context =
+                        new ParallelPhoenixContext(info, haGroup,
+                                PhoenixHAExecutorServiceProvider.get(info), 
executorCapacities);
+                return new ParallelPhoenixConnection(context);
+            } else {
+                // TODO: Once we have operation/primary wait timeout use the 
same
+                // Give regular connection or a failover connection?
+                LOG.warn("Falling back to single phoenix connection due to 
resource constraints");
+                
GlobalClientMetrics.GLOBAL_HA_PARALLEL_CONNECTION_FALLBACK_COUNTER.increment();
+                return haGroup.connectActive(info);
+            }
+        }
+        @Override
+        void transitClusterRole(HighAvailabilityGroup haGroup, 
ClusterRoleRecord oldRecord,
+                ClusterRoleRecord newRecord) {
+            LOG.info("Cluster role changed for parallel HA policy.");

Review Comment:
   why just a no-op? Because in PARALLEL everything's always ACTIVE/ACTIVE? But 
then why log that something changed?



##########
phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java:
##########
@@ -165,8 +165,30 @@ public enum MetricType {
     NUM_METADATA_LOOKUP_FAILURES("nmlf", "Number of Failed  metadata lookup 
calls",
                                  LogLevel.DEBUG,PLong.INSTANCE),
     TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS("tsistrc", "Time spent in RPC calls 
for systemTable lookup",
-                                         LogLevel.DEBUG,PLong.INSTANCE);
-       
+                                         LogLevel.DEBUG,PLong.INSTANCE),
+
+    //HA Related Metrics
+    HA_PARALLEL_COUNT_OPERATIONS_ACTIVE_CLUSTER("hpoac","Number of Operations 
to the active cluster",LogLevel.DEBUG,PLong.INSTANCE),
+    HA_PARALLEL_COUNT_OPERATIONS_STANDBY_CLUSTER("hposc","Number of Operations 
to the active cluster",LogLevel.DEBUG,PLong.INSTANCE),

Review Comment:
   nit: description of standby metric says active



##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java:
##########
@@ -0,0 +1,653 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.ParallelPhoenixUtil.FutureResult;
+import org.apache.phoenix.monitoring.MetricType;
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+import static 
org.apache.phoenix.exception.SQLExceptionCode.CLASS_NOT_UNWRAPPABLE;
+
+public class ParallelPhoenixConnection implements PhoenixMonitoredConnection {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ParallelPhoenixConnection.class);
+
+    private final ParallelPhoenixContext context;
+    CompletableFuture<PhoenixConnection> futureConnection1;
+    CompletableFuture<PhoenixConnection> futureConnection2;
+
+    public ParallelPhoenixConnection(ParallelPhoenixContext context) throws 
SQLException {
+        this.context = context;
+        LOG.debug("First Url: {} Second Url: {}", 
context.getHaGroup().getGroupInfo().getUrl1(),

Review Comment:
   Should this be TRACE because this is a pretty frequent path?



##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java:
##########
@@ -0,0 +1,653 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.ParallelPhoenixUtil.FutureResult;
+import org.apache.phoenix.monitoring.MetricType;
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+import static 
org.apache.phoenix.exception.SQLExceptionCode.CLASS_NOT_UNWRAPPABLE;
+
+public class ParallelPhoenixConnection implements PhoenixMonitoredConnection {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ParallelPhoenixConnection.class);
+
+    private final ParallelPhoenixContext context;
+    CompletableFuture<PhoenixConnection> futureConnection1;
+    CompletableFuture<PhoenixConnection> futureConnection2;
+
+    public ParallelPhoenixConnection(ParallelPhoenixContext context) throws 
SQLException {
+        this.context = context;
+        LOG.debug("First Url: {} Second Url: {}", 
context.getHaGroup().getGroupInfo().getUrl1(),
+                context.getHaGroup().getGroupInfo().getUrl2());
+        futureConnection1 = context.chainOnConn1(() -> 
getConnection(context.getHaGroup(),
+                context.getHaGroup().getGroupInfo().getUrl1(),
+                context.getProperties()));
+        futureConnection2 = context.chainOnConn2(() -> 
getConnection(context.getHaGroup(),
+                context.getHaGroup().getGroupInfo().getUrl2(),
+                context.getProperties()));
+    }
+
+    @VisibleForTesting
+    ParallelPhoenixConnection(ParallelPhoenixContext context, 
CompletableFuture<PhoenixConnection> futureConnection1, 
CompletableFuture<PhoenixConnection> futureConnection2) {
+        this.context = context;
+        this.futureConnection1 = futureConnection1;
+        this.futureConnection2 = futureConnection2;
+    }
+
+    private static PhoenixConnection getConnection(HighAvailabilityGroup 
haGroup, String url, Properties properties) {
+        try {
+            return haGroup.connectToOneCluster(url, properties);
+        } catch (SQLException exception) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace(String.format("Failed to get a connection for 
haGroup %s to %s", haGroup.toString(), url), exception);
+            }
+            throw new CompletionException(exception);
+        }
+    }
+
+    public CompletableFuture<PhoenixConnection> getFutureConnection1() {
+        return futureConnection1;
+    }
+
+    public CompletableFuture<PhoenixConnection> getFutureConnection2() {
+        return futureConnection2;
+    }
+
+    @VisibleForTesting
+    ParallelPhoenixContext getContext() {
+        return this.context;
+    }
+
+    Object runOnConnections(Function<PhoenixConnection, ?> function, boolean 
useMetrics) throws SQLException {
+        return ParallelPhoenixUtil.INSTANCE.runFutures(function, 
futureConnection1, futureConnection2, context, useMetrics);
+    }
+
+    PairOfSameType<Object> runOnConnectionsGetAll(Function<PhoenixConnection, 
?> function, boolean useMetrics) throws SQLException {
+        return ParallelPhoenixUtil.INSTANCE.runOnFuturesGetAll(function, 
futureConnection1, futureConnection2, context, useMetrics);
+    }
+
+    @Override
+    public ParallelPhoenixStatement createStatement() throws SQLException {
+        context.checkOpen();
+
+        Function<PhoenixConnection, PhoenixMonitoredStatement> function = (T) 
-> {
+            try {
+                return (PhoenixStatement) T.createStatement();
+            } catch (SQLException exception) {
+                throw new CompletionException(exception);
+            }
+        };
+
+        List<CompletableFuture<PhoenixMonitoredStatement>> futures = 
ParallelPhoenixUtil.INSTANCE.applyFunctionToFutures(function, futureConnection1,
+                futureConnection2, context, true);
+
+        Preconditions.checkState(futures.size() == 2);
+        CompletableFuture<PhoenixMonitoredStatement> statement1 = 
futures.get(0);
+        CompletableFuture<PhoenixMonitoredStatement> statement2 = 
futures.get(1);
+
+        //Ensure one statement is successful before returning
+        ParallelPhoenixUtil.INSTANCE.runFutures(futures, context, true);
+
+        return new ParallelPhoenixStatement(context, statement1, statement2);
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql) throws SQLException {
+        context.checkOpen();
+
+        Function<PhoenixConnection, PhoenixMonitoredPreparedStatement> 
function = (T) -> {
+            try {
+                return (PhoenixMonitoredPreparedStatement) 
T.prepareStatement(sql);
+            } catch (SQLException exception) {
+                throw new CompletionException(exception);
+            }
+        };
+
+        List<CompletableFuture<PhoenixMonitoredPreparedStatement>> futures = 
ParallelPhoenixUtil.INSTANCE.applyFunctionToFutures(function, futureConnection1,
+                futureConnection2, context, true);
+
+        Preconditions.checkState(futures.size() == 2);
+        CompletableFuture<PhoenixMonitoredPreparedStatement> statement1 = 
futures.get(0);
+        CompletableFuture<PhoenixMonitoredPreparedStatement> statement2 = 
futures.get(1);
+
+        //Ensure one statement is successful before returning
+        ParallelPhoenixUtil.INSTANCE.runFutures(futures, context, true);

Review Comment:
   Let's say that cluster1 (which is active)'s prepared statement passes and 
cluster2's prepared statement fails, but then when actually running the 
statement, cluster1's query fails, but cluster 2 is now healthy but has a 
broken statement. What happens?



##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java:
##########
@@ -0,0 +1,646 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.phoenix.exception.FailoverSQLException;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.monitoring.MetricType;
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+/**
+ * An implementation of JDBC connection which supports failover between two 
cluster in an HA group.
+ * <p>
+ * During its lifetime, a failover Phoenix connection could possibly connect 
to two HBase clusters
+ * in an HA group mutually exclusively.  It wraps and delegates the logic to a 
PhoenixConnection
+ * object.  At any given time, the wrapped connection should only talk to the 
ACTIVE HBase cluster
+ * in the HA group.
+ * <p>
+ * A failover connection will behave according to the given failover policy 
upon cluster role
+ * failover, especially when the current connected HBase cluster becomes 
STANDBY role from ACTIVE.
+ * The default behavior (aka default failover policy) will simply close the 
current connection and
+ * throw {@link org.apache.phoenix.exception.FailoverSQLException} exception 
to those clients who
+ * still use this connection after closing.
+ * <p>
+ * This class is not thread safe.
+ *
+ * @see HighAvailabilityGroup
+ * @see FailoverPolicy
+ */
+public class FailoverPhoenixConnection implements PhoenixMonitoredConnection {
+    /**
+     * Failover timeout interval after which failover operation will fail and 
clients can retry.
+     */
+    public static final String FAILOVER_TIMEOUT_MS_ATTR = 
"phoenix.ha.failover.timeout.ms";
+    public static final long FAILOVER_TIMEOUT_MS_DEFAULT = 10_000;
+    private static final Logger LOG = 
LoggerFactory.getLogger(FailoverPhoenixConnection.class);
+    /**
+     * Connection properties.
+     */
+    private final Properties properties;
+    /**
+     * High availability group.
+     */
+    private final HighAvailabilityGroup haGroup;
+    /**
+     * Failover policy, per connection.
+     */
+    private final FailoverPolicy policy;
+
+    /**
+     * True iff this connection has been closed by the client.
+     */
+    private boolean isClosed;
+    /**
+     * The wrapped PhoenixConnection object which could be re-assigned upon 
failover operation.
+     */
+    private PhoenixConnection connection;
+
+    /**
+     * Mutation metrics before failover to current connection.
+     */
+    private Map<String, Map<MetricType, Long>> previousMutationMetrics = new 
HashMap<>();
+    /**
+     * Read metrics before failover to current connection.
+     */
+    private Map<String, Map<MetricType, Long>> previousReadMetrics = new 
HashMap<>();
+
+    public FailoverPhoenixConnection(HighAvailabilityGroup haGroup, Properties 
properties)
+            throws SQLException {
+        this.properties = properties;
+        this.haGroup = haGroup;
+        this.policy = FailoverPolicy.get(properties);
+        this.isClosed = false;
+        this.connection = haGroup.connectActive(properties);
+    }
+
+    /**
+     * This is used for explicit failover request made by client.
+     * <p>
+     * It fails over to the current ACTIVE HBase cluster; if failover happens 
in between, this could
+     * possibly target this same cluster again.
+     * <p>
+     * TODO: If there are two ACTIVE clusters, then switch to the other ACTIVE 
cluster.
+     *
+     * @param conn      if not of FailoverPhoenixConnection type, throw 
illegal argument exception
+     * @param timeoutMs timeout in milliseconds to failover to current active 
cluster
+     * @throws SQLException if fails to failover
+     */
+    public static void failover(Connection conn, long timeoutMs) throws 
SQLException {
+        Preconditions.checkNotNull(conn, "Connection to failover must not be 
null!");
+        FailoverPhoenixConnection failoverConnection = 
conn.unwrap(FailoverPhoenixConnection.class);
+        if (failoverConnection == null) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CLASS_NOT_UNWRAPPABLE)
+                    .setMessage("Connection is not a valid 
FailoverPhoenixConnection object")
+                    .build()
+                    .buildException();
+        }
+        failoverConnection.failover(timeoutMs);
+    }
+
+    /**
+     * Helper method to merge two metrics map into one.
+     * <p>
+     * Shallow copy the first one, and deep copy the second one.
+     * An optimization is that, it will return the shallow directly if the 
deep is empty.
+     */
+    private static Map<String, Map<MetricType, Long>> mergeMetricMaps(
+            Map<String, Map<MetricType, Long>> shallow, Map<String, 
Map<MetricType, Long>> deep) {
+        if (deep.isEmpty()) {
+            return shallow;
+        }
+
+        Map<String, Map<MetricType, Long>> metrics = new HashMap<>(shallow);
+        deep.forEach((k, v) -> {
+            metrics.putIfAbsent(k, new HashMap<>());
+            Map<MetricType, Long> map = metrics.get(k);
+            v.forEach((kk, vv) -> {
+                Long value = map.getOrDefault(kk, 0L);
+                map.put(kk, value + vv);
+            });
+        });
+        return metrics;
+    }
+
+    /**
+     * Failover this connection by switching underlying phoenix connection to 
the ACTIVE one.
+     * <p>
+     * If the current phoenix connection is already connecting to ACTIVE 
cluster, this is a no-op.
+     *
+     * @param timeoutMs timeout in ms waiting for a new connection to be 
established.
+     * @throws SQLException if fails to failover
+     */
+    @VisibleForTesting
+    void failover(long timeoutMs) throws SQLException {
+        checkConnection();
+
+        if (haGroup.isActive(connection)) {
+            LOG.info("Connection {} is against ACTIVE cluster in HA group {}; 
skip failing over.",
+                    connection.getURL(), haGroup.getGroupInfo().getName());
+            return;
+        }
+
+        PhoenixConnection newConn = null;
+        SQLException cause = null;
+        final long startTime = EnvironmentEdgeManager.currentTimeMillis();
+        while (newConn == null &&
+                EnvironmentEdgeManager.currentTimeMillis() < startTime + 
timeoutMs) {
+            try {
+                newConn = haGroup.connectActive(properties);
+            } catch (SQLException e) {
+                cause = e;
+                LOG.info("Got exception when trying to connect to active 
cluster.", e);
+                try {
+                    Thread.sleep(100); // TODO: be smart than this
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    throw new SQLException("Got interrupted waiting for 
connection failover", e);
+                }
+            }
+        }
+        if (newConn == null) {
+            throw new FailoverSQLException("Can not failover connection",
+                    haGroup.getGroupInfo().toString(), cause);
+        }
+
+        final PhoenixConnection oldConn = connection;
+        connection = newConn;
+        if (oldConn != null) {
+            // aggregate metrics
+            previousMutationMetrics = oldConn.getMutationMetrics();
+            previousReadMetrics = oldConn.getReadMetrics();
+            oldConn.clearMetrics();
+
+            // close old connection
+            if (!oldConn.isClosed()) {
+                // TODO:  what happens to in-flight edits/mutations?
+                // Can we copy into the new connection we do not allow this 
failover?
+                // MutationState state = oldConn.getMutationState();

Review Comment:
   We'd want to look _really_ carefully at the thread-safe implications of 
taking the uncommitted mutation state out of a connection and putting it into 
another one. Definitely seems good to defer this TODO for now. 



##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java:
##########
@@ -0,0 +1,646 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.phoenix.exception.FailoverSQLException;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.monitoring.MetricType;
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+/**
+ * An implementation of JDBC connection which supports failover between two 
cluster in an HA group.
+ * <p>
+ * During its lifetime, a failover Phoenix connection could possibly connect 
to two HBase clusters
+ * in an HA group mutually exclusively.  It wraps and delegates the logic to a 
PhoenixConnection
+ * object.  At any given time, the wrapped connection should only talk to the 
ACTIVE HBase cluster
+ * in the HA group.
+ * <p>
+ * A failover connection will behave according to the given failover policy 
upon cluster role
+ * failover, especially when the current connected HBase cluster becomes 
STANDBY role from ACTIVE.
+ * The default behavior (aka default failover policy) will simply close the 
current connection and
+ * throw {@link org.apache.phoenix.exception.FailoverSQLException} exception 
to those clients who
+ * still use this connection after closing.
+ * <p>
+ * This class is not thread safe.
+ *
+ * @see HighAvailabilityGroup
+ * @see FailoverPolicy
+ */
+public class FailoverPhoenixConnection implements PhoenixMonitoredConnection {
+    /**
+     * Failover timeout interval after which failover operation will fail and 
clients can retry.
+     */
+    public static final String FAILOVER_TIMEOUT_MS_ATTR = 
"phoenix.ha.failover.timeout.ms";
+    public static final long FAILOVER_TIMEOUT_MS_DEFAULT = 10_000;
+    private static final Logger LOG = 
LoggerFactory.getLogger(FailoverPhoenixConnection.class);
+    /**
+     * Connection properties.
+     */
+    private final Properties properties;
+    /**
+     * High availability group.
+     */
+    private final HighAvailabilityGroup haGroup;
+    /**
+     * Failover policy, per connection.
+     */
+    private final FailoverPolicy policy;
+
+    /**
+     * True iff this connection has been closed by the client.
+     */
+    private boolean isClosed;
+    /**
+     * The wrapped PhoenixConnection object which could be re-assigned upon 
failover operation.
+     */
+    private PhoenixConnection connection;
+
+    /**
+     * Mutation metrics before failover to current connection.
+     */
+    private Map<String, Map<MetricType, Long>> previousMutationMetrics = new 
HashMap<>();
+    /**
+     * Read metrics before failover to current connection.
+     */
+    private Map<String, Map<MetricType, Long>> previousReadMetrics = new 
HashMap<>();
+
+    public FailoverPhoenixConnection(HighAvailabilityGroup haGroup, Properties 
properties)
+            throws SQLException {
+        this.properties = properties;
+        this.haGroup = haGroup;
+        this.policy = FailoverPolicy.get(properties);
+        this.isClosed = false;
+        this.connection = haGroup.connectActive(properties);
+    }
+
+    /**
+     * This is used for explicit failover request made by client.
+     * <p>
+     * It fails over to the current ACTIVE HBase cluster; if failover happens 
in between, this could
+     * possibly target this same cluster again.
+     * <p>
+     * TODO: If there are two ACTIVE clusters, then switch to the other ACTIVE 
cluster.
+     *
+     * @param conn      if not of FailoverPhoenixConnection type, throw 
illegal argument exception
+     * @param timeoutMs timeout in milliseconds to failover to current active 
cluster
+     * @throws SQLException if fails to failover
+     */
+    public static void failover(Connection conn, long timeoutMs) throws 
SQLException {
+        Preconditions.checkNotNull(conn, "Connection to failover must not be 
null!");
+        FailoverPhoenixConnection failoverConnection = 
conn.unwrap(FailoverPhoenixConnection.class);
+        if (failoverConnection == null) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CLASS_NOT_UNWRAPPABLE)
+                    .setMessage("Connection is not a valid 
FailoverPhoenixConnection object")
+                    .build()
+                    .buildException();
+        }
+        failoverConnection.failover(timeoutMs);
+    }
+
+    /**
+     * Helper method to merge two metrics map into one.
+     * <p>
+     * Shallow copy the first one, and deep copy the second one.
+     * An optimization is that, it will return the shallow directly if the 
deep is empty.
+     */
+    private static Map<String, Map<MetricType, Long>> mergeMetricMaps(
+            Map<String, Map<MetricType, Long>> shallow, Map<String, 
Map<MetricType, Long>> deep) {
+        if (deep.isEmpty()) {
+            return shallow;
+        }
+
+        Map<String, Map<MetricType, Long>> metrics = new HashMap<>(shallow);
+        deep.forEach((k, v) -> {
+            metrics.putIfAbsent(k, new HashMap<>());
+            Map<MetricType, Long> map = metrics.get(k);
+            v.forEach((kk, vv) -> {
+                Long value = map.getOrDefault(kk, 0L);
+                map.put(kk, value + vv);
+            });
+        });
+        return metrics;
+    }
+
+    /**
+     * Failover this connection by switching underlying phoenix connection to 
the ACTIVE one.
+     * <p>
+     * If the current phoenix connection is already connecting to ACTIVE 
cluster, this is a no-op.
+     *
+     * @param timeoutMs timeout in ms waiting for a new connection to be 
established.
+     * @throws SQLException if fails to failover
+     */
+    @VisibleForTesting
+    void failover(long timeoutMs) throws SQLException {
+        checkConnection();
+
+        if (haGroup.isActive(connection)) {
+            LOG.info("Connection {} is against ACTIVE cluster in HA group {}; 
skip failing over.",
+                    connection.getURL(), haGroup.getGroupInfo().getName());
+            return;
+        }
+
+        PhoenixConnection newConn = null;
+        SQLException cause = null;
+        final long startTime = EnvironmentEdgeManager.currentTimeMillis();
+        while (newConn == null &&
+                EnvironmentEdgeManager.currentTimeMillis() < startTime + 
timeoutMs) {
+            try {
+                newConn = haGroup.connectActive(properties);
+            } catch (SQLException e) {
+                cause = e;
+                LOG.info("Got exception when trying to connect to active 
cluster.", e);
+                try {
+                    Thread.sleep(100); // TODO: be smart than this
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    throw new SQLException("Got interrupted waiting for 
connection failover", e);
+                }
+            }
+        }
+        if (newConn == null) {
+            throw new FailoverSQLException("Can not failover connection",
+                    haGroup.getGroupInfo().toString(), cause);
+        }
+
+        final PhoenixConnection oldConn = connection;
+        connection = newConn;
+        if (oldConn != null) {
+            // aggregate metrics
+            previousMutationMetrics = oldConn.getMutationMetrics();
+            previousReadMetrics = oldConn.getReadMetrics();
+            oldConn.clearMetrics();
+
+            // close old connection
+            if (!oldConn.isClosed()) {
+                // TODO:  what happens to in-flight edits/mutations?
+                // Can we copy into the new connection we do not allow this 
failover?
+                // MutationState state = oldConn.getMutationState();
+                try {
+                    oldConn.close(new SQLExceptionInfo
+                            .Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER)
+                            .setMessage("Phoenix connection got closed due to 
failover")
+                            .setHaGroupInfo(haGroup.getGroupInfo().toString())
+                            .build()
+                            .buildException());
+                } catch (SQLException e) {
+                    LOG.error("Failed to close old connection after failover: 
{}", e.getMessage());
+                    LOG.info("Full stack when closing old connection after 
failover", e);
+                }
+            }
+        }
+        LOG.info("Connection {} failed over to {}", haGroup.getGroupInfo(), 
connection.getURL());
+    }
+
+    /**
+     * Connection can not be null before any operation.
+     * <p>
+     * Here when connection is non-null, we do not need to check if the 
wrapped connection is open.
+     * The reason is that each individual delegated call on the wrapped 
connection will internally
+     * check open itself, see {@link PhoenixConnection#checkOpen()}.
+     *
+     * @throws SQLException if current wrapped phoenix connection is not valid 
state
+     */
+    private void checkConnection() throws SQLException {
+        if (isClosed) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CONNECTION_CLOSED)
+                    .setHaGroupInfo(haGroup.getGroupInfo().toString())
+                    .build()
+                    .buildException();
+        }
+        if (connection == null) {
+            throw new SQLExceptionInfo
+                    .Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
+                    .setMessage("Connection has not been established to ACTIVE 
HBase cluster")
+                    .setHaGroupInfo(haGroup.getGroupInfo().toString())
+                    .build()
+                    .buildException();
+        }
+    }
+
+    @Override
+    public void close() throws SQLException {
+        if (isClosed()) {
+            return;
+        }
+
+        try {
+            connection.close();
+            connection.clearMetrics();
+        } finally {
+            previousMutationMetrics.clear();
+            previousReadMetrics.clear();
+            isClosed = true;
+        }
+    }
+
+    @Override
+    public boolean isClosed() {
+        return isClosed;
+    }
+
+    //// metrics for monitoring methods
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> T unwrap(Class<T> iface) throws SQLException {
+        if (!iface.isInstance(this)) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CLASS_NOT_UNWRAPPABLE)
+                    .setMessage(getClass().getName() + " not unwrappable from 
" + iface.getName())
+                    .build()
+                    .buildException();
+        }
+        return (T) this;
+    }
+
+    @Override
+    public Map<String, Map<MetricType, Long>> getMutationMetrics() {
+        return mergeMetricMaps(connection.getMutationMetrics(), 
previousMutationMetrics);
+    }
+
+    @Override
+    public Map<String, Map<MetricType, Long>> getReadMetrics() {
+        return mergeMetricMaps(connection.getReadMetrics(), 
previousReadMetrics);
+    }
+
+    @Override
+    public boolean isRequestLevelMetricsEnabled() {
+        return connection != null && connection.isRequestLevelMetricsEnabled();
+    }
+
+    @Override
+    public void clearMetrics() {
+        previousMutationMetrics.clear();
+        previousReadMetrics.clear();
+        if (connection != null) {
+            connection.clearMetrics();
+        }
+    }
+
+    //// Wrapping phoenix connection operations
+
+    /**
+     * This is the utility method to help wrapping a method call to phoenix 
connection.
+     *
+     * @param s   the supplier which returns a value and may throw SQLException
+     * @param <T> type of the returned object by the supplier
+     * @return the object returned by the supplier if any
+     * @throws SQLException exception when getting object from the supplier
+     */
+    @VisibleForTesting
+    <T> T wrapActionDuringFailover(SupplierWithSQLException<T> s) throws 
SQLException {
+        checkConnection();
+        final long timeoutMs = 
Long.parseLong(properties.getProperty(FAILOVER_TIMEOUT_MS_ATTR,
+                String.valueOf(FAILOVER_TIMEOUT_MS_DEFAULT)));
+        int failoverCount = 0;
+        while (true) {
+            try {
+                return s.get();
+            } catch (SQLException e) {
+                if (policy.shouldFailover(e, ++failoverCount)) {
+                    failover(timeoutMs);
+                } else {
+                    throw new SQLException(
+                            String.format("Error on operation with failover 
policy %s", policy), e);
+                }
+            }
+        }
+    }
+
+    @VisibleForTesting
+    void wrapActionDuringFailover(RunWithSQLException runnable) throws 
SQLException {
+        wrapActionDuringFailover(() -> {
+            runnable.run();
+            return null;
+        });
+    }
+
+    @Override
+    public void commit() throws SQLException {
+        wrapActionDuringFailover(() -> connection.commit());

Review Comment:
   Really clever way of wrapping the Connection API...I see now why a Java 7 
port wasn't feasible.



##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.jdbc;
+
+import static org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole.ACTIVE;
+import static org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole.STANDBY;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An HighAvailabilityGroup provides a JDBC connection from given connection 
string and properties.
+ */
+enum HighAvailabilityPolicy {
+    FAILOVER {
+        @Override
+        public Connection provide(HighAvailabilityGroup haGroup, Properties 
info)
+                throws SQLException {
+            return new FailoverPhoenixConnection(haGroup, info);
+        }
+        @Override
+        void transitClusterRole(HighAvailabilityGroup haGroup, 
ClusterRoleRecord oldRecord,
+                ClusterRoleRecord newRecord) throws SQLException {
+            if (oldRecord.getRole1() == ACTIVE && newRecord.getRole1() == 
STANDBY) {

Review Comment:
   Should there be some protection against a newRecord with  {STANDBY, STANDBY} 
(or {ACTIVE, ACTIVE} in a cluster pair in failover mode))



##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixResultSet.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.jdbc;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.monitoring.MetricType;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Function;
+
+import static 
org.apache.phoenix.exception.SQLExceptionCode.CLASS_NOT_UNWRAPPABLE;
+
+public class ParallelPhoenixResultSet extends DelegateResultSet implements 
PhoenixMonitoredResultSet {

Review Comment:
   What's the distinction between this and the 
ParallelPhoenixNullComparingResultSet? When would you use one over the other?



##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.monitoring.MetricType;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static 
org.apache.phoenix.jdbc.ParallelPhoenixUtil.PHOENIX_HA_PARALLEL_OPERATION_TIMEOUT_ATTRIB;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_CONNECTION_CREATED_COUNTER;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_CONNECTION_ERROR_COUNTER;
+import static org.apache.phoenix.query.QueryServices.AUTO_COMMIT_ATTRIB;
+
+/**
+ * ParallelPhoenixContext holds the state of hte execution of a parallel 
phoenix operation as well as metrics.

Review Comment:
   tiny nit: typo on "the"



##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixHAExecutorServiceProvider.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL1_TASK_END_TO_END_TIME;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL1_TASK_EXECUTED_COUNTER;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL1_TASK_EXECUTION_TIME;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL1_TASK_QUEUE_WAIT_TIME;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL1_TASK_REJECTED_COUNTER;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_END_TO_END_TIME;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_EXECUTED_COUNTER;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_EXECUTION_TIME;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_QUEUE_WAIT_TIME;
+import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_REJECTED_COUNTER;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
+import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+
+/**
+ * Provides a bounded and configurable executor service for {@link 
ParallelPhoenixConnection} and
+ * related infra. Provides a lazily initialized singleton executor service to 
be used for all
+ * ParallelPhoenixConnections Also provides visibility into the capacity of 
ExecutorService
+ * {@link PhoenixHAExecutorServiceProvider#hasCapacity(Properties)}
+ */
+public class PhoenixHAExecutorServiceProvider {
+
+    public static final String HA_MAX_POOL_SIZE = "phoenix.ha.max.pool.size";
+    public static final String DEFAULT_HA_MAX_POOL_SIZE = "30";
+    public static final String HA_MAX_QUEUE_SIZE = "phoenix.ha.max.queue.size";
+    public static final String DEFAULT_HA_MAX_QUEUE_SIZE = "300";
+    public static final String HA_THREADPOOL_QUEUE_BACKOFF_THRESHOLD =
+            "phoenix.ha.threadpool.queue.backoff.threshold";
+    public static final String DEFAULT_HA_THREADPOOL_QUEUE_BACKOFF_THRESHOLD = 
"0.9";
+
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(PhoenixHAExecutorServiceProvider.class);
+
+    // Can make configurable if needed
+    private static final int KEEP_ALIVE_TIME_SECONDS = 120;
+
+    private static volatile List<ExecutorService> INSTANCE = null;
+
+    private PhoenixHAExecutorServiceProvider() {
+    }
+
+    public static List<ExecutorService> get(Properties properties) {
+        if (INSTANCE == null) {
+            synchronized (PhoenixHAExecutorServiceProvider.class) {
+                if (INSTANCE == null) {
+                    INSTANCE = initThreadPool(properties);
+                }
+            }
+        }
+        return INSTANCE;
+    }
+
+    @VisibleForTesting
+    static synchronized void resetExecutor() {
+        INSTANCE = null;
+    }
+
+    /**
+     * Checks if the underlying executorServices have sufficient available 
capacity based on
+     * {@link #HA_THREADPOOL_QUEUE_BACKOFF_THRESHOLD}. Monitors the capacity 
of the blockingqueues
+     * linked with the executor services.
+     * @param properties
+     * @return true if queue is less than {@link 
#HA_THREADPOOL_QUEUE_BACKOFF_THRESHOLD} full
+     */
+    public static List<Boolean> hasCapacity(Properties properties) {
+        if (INSTANCE == null) {
+            return ImmutableList.of(Boolean.TRUE, Boolean.TRUE);
+        }
+        double backoffThreshold =
+                
Double.parseDouble(properties.getProperty(HA_THREADPOOL_QUEUE_BACKOFF_THRESHOLD,
+                    DEFAULT_HA_THREADPOOL_QUEUE_BACKOFF_THRESHOLD));
+        int i = 0;
+        List<Boolean> executorCapacities = new ArrayList<>();
+        for (ExecutorService executor : INSTANCE) {
+            double queueSize = ((ThreadPoolExecutor) 
executor).getQueue().size();
+            double queueRemainingCapacity =
+                    ((ThreadPoolExecutor) 
executor).getQueue().remainingCapacity();
+            double queueCapacity = queueSize + queueRemainingCapacity;
+            boolean hasCapacity = ((queueSize / queueCapacity) < 
backoffThreshold);
+            if (hasCapacity) {
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.info("PhoenixHAExecutorServiceProvider 
ThreadPoolExecutor[" + i

Review Comment:
   isDebugEnabled() wrapping an info() call. Also, won't this be very frequent 
even for a DEBUG?



##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java:
##########
@@ -0,0 +1,871 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import com.sun.istack.NotNull;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
+import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
+import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.phoenix.util.JDBCUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION;
+
+/**
+ * An high availability (HA) group is an association between a pair of HBase 
clusters, a group of
+ * clients, and an HA policy.
+ * <p>
+ * This class is thread safe. Multiple threads may access an instance of this 
class, including
+ * multiple clients that call init in order to create a connection, two 
cluster role managers that
+ * watches node changes in ZooKeeper.
+ * <p>
+ * The lifecycle of an HA group is confined in the global cache, meaning 
clients can get an instance
+ * from the cache but cannot construct or close an HA group instance.  The 
reason is that HA group
+ * is a shared resource by many clients.  Closing it intentionally or 
accidentally by a client will
+ * impact other connections in this group with unexpected behavior.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class HighAvailabilityGroup {
+    public static final String PHOENIX_HA_ATTR_PREFIX = "phoenix.ha.";
+    public static final String PHOENIX_HA_GROUP_ATTR = PHOENIX_HA_ATTR_PREFIX 
+ "group.name";
+    /**
+     * Should we fall back to single cluster when cluster role record is 
missing?
+     */
+    public static final String PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_KEY 
=
+            PHOENIX_HA_ATTR_PREFIX + "fallback.enabled";
+    public static final String 
PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_DEFAULT =
+            String.valueOf(Boolean.TRUE);
+    /**
+     * The single-cluster connection URL when it needs to fall back.
+     */
+    public static final String PHOENIX_HA_FALLBACK_CLUSTER_KEY =
+            PHOENIX_HA_ATTR_PREFIX + "fallback.cluster";
+    public static final String PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE =
+            "phoenix" + ZKPaths.PATH_SEPARATOR + "ha";
+
+    public static final String PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_KEY =
+            PHOENIX_HA_ATTR_PREFIX + "zk.connection.timeout.ms";
+    public static final int PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT = 
4_000;
+    public static final String PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_KEY =
+            PHOENIX_HA_ATTR_PREFIX + "zk.session.timeout.ms";
+    public static final int PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_DEFAULT = 4_000;
+    public static final String PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_KEY =
+            PHOENIX_HA_ATTR_PREFIX + "zk.retry.base.sleep.ms";
+
+    public static final int PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_DEFAULT = 1000;
+    public static final String PHOENIX_HA_ZK_RETRY_MAX_KEY =
+            PHOENIX_HA_ATTR_PREFIX + "zk.retry.max";
+    public static final int PHOENIX_HA_ZK_RETRY_MAX_DEFAULT = 5;
+    public static final String PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY =
+            PHOENIX_HA_ATTR_PREFIX + "zk.retry.max.sleep.ms";
+    public static final int PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_DEFAULT = 10_000;
+    public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(
+            PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_DEFAULT,
+            PHOENIX_HA_ZK_RETRY_MAX_DEFAULT,
+            PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_DEFAULT);
+
+    public static final String PHOENIX_HA_TRANSITION_TIMEOUT_MS_KEY =
+            PHOENIX_HA_ATTR_PREFIX + "transition.timeout.ms";
+    public static final long PHOENIX_HA_TRANSITION_TIMEOUT_MS_DEFAULT = 5 * 60 
* 1000; // 5 mins
+
+    static final Logger LOG = 
LoggerFactory.getLogger(HighAvailabilityGroup.class);
+    @VisibleForTesting
+    static final Map<HAGroupInfo, HighAvailabilityGroup> GROUPS = new 
ConcurrentHashMap<>();
+    @VisibleForTesting
+    static final Cache<HAGroupInfo, Boolean> MISSING_CRR_GROUPS_CACHE = 
CacheBuilder.newBuilder()
+            .expireAfterWrite(PHOENIX_HA_TRANSITION_TIMEOUT_MS_DEFAULT, 
TimeUnit.MILLISECONDS)
+            .build();
+    /**
+     * The Curator client cache, one client instance per cluster.
+     */
+    @VisibleForTesting
+    static final Cache<String, CuratorFramework> CURATOR_CACHE = 
CacheBuilder.newBuilder()
+            .expireAfterAccess(DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION, 
TimeUnit.MILLISECONDS)
+            .removalListener((notification) ->
+                    ((CuratorFramework) 
Objects.requireNonNull(notification.getValue())).close())
+            .build();
+    /**
+     * High availability group info.
+     */
+    private final HAGroupInfo info;
+    /**
+     * Client properties used to initialize this HA group.
+     */
+    private final Properties properties;
+    /**
+     * Executor service for the two role managers.
+     */
+    private final ExecutorService roleManagerExecutor = 
Executors.newFixedThreadPool(2,
+            new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("phoenixHAGroup-%d").build());
+    /**
+     * The count down latch to make sure at least one role manager has pulled 
data from ZK.
+     */
+    private final CountDownLatch roleManagerLatch = new CountDownLatch(1);
+    /**
+     * Pair of role managers for watching cluster role records from the two ZK 
clusters.
+     */
+    private final AtomicReference<PairOfSameType<HAClusterRoleManager>> 
roleManagers
+            = new AtomicReference<>();
+    /**
+     * Executor for applying the cluster role to this HA group.
+     */
+    private final ExecutorService nodeChangedExecutor = 
Executors.newFixedThreadPool(1);
+    /**
+     * Current cluster role record for this HA group.
+     */
+    private volatile ClusterRoleRecord roleRecord;
+    /**
+     * State of this HA group.
+     */
+    private volatile State state = State.UNINITIALIZED;
+
+    /**
+     * Private constructor.
+     * <p>
+     * To get an instance, please call {@link 
HighAvailabilityGroup#get(String, Properties)}.
+     */
+    private HighAvailabilityGroup(HAGroupInfo info, Properties properties) {
+        this.info = info;
+        this.properties = properties;
+    }
+    /**
+     * This is for test usage only. In production, the record should be 
retrieved from ZooKeeper.
+     */
+    @VisibleForTesting
+    HighAvailabilityGroup(HAGroupInfo info, Properties properties, 
ClusterRoleRecord record,
+                          State state) {
+        this.info = info;
+        this.properties = properties;
+        this.roleRecord = record;
+        this.state = state;
+    }
+
+    public static HAGroupInfo getHAGroupInfo(String url, Properties properties)
+            throws SQLException {
+        if (url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) {
+            url = url.substring(PhoenixRuntime.JDBC_PROTOCOL.length() + 1);
+        }
+        if (!(url.contains("[") && url.contains("|") && url.contains("]"))) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
+                    .setMessage(String.format("URL %s is not a valid HA 
connection string", url))
+                    .build()
+                    .buildException();
+        }
+        url = url.substring(url.indexOf("[") + 1, url.indexOf("]"));
+        String[] urls = url.split("\\|");
+
+        String name = properties.getProperty(PHOENIX_HA_GROUP_ATTR);
+        if (StringUtils.isEmpty(name)) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.HA_INVALID_PROPERTIES)
+                    .setMessage(String.format("HA group name can not be empty 
for HA URL %s", url))
+                    .build()
+                    .buildException();
+        }
+        return new HAGroupInfo(name, urls[0], urls[1]);
+    }
+
+    /**
+     * Get an instance of HA group given the HA connecting URL (with "|") and 
client properties.
+     * <p>
+     * The HA group does not have a public constructor. This method is the 
only public one for
+     * getting an HA group instance. The reason is that, HA group is 
considered expensive to create
+     * and maintain. Caching it will make it reusable for all connection 
requests to this group.
+     * <p>
+     * It will return the cached instance, if any, for the target HA group. 
The HA group creation
+     * and initialization are blocking operations. Upon initialization 
failure, the HA group
+     * information may be saved in a negative cache iff the cause is due to 
missing cluster role
+     * records. In presence of empty (not null or exception) return value, 
client may choose to fall
+     * back to a single cluster connection to compensate missing cluster role 
records.
+     *
+     * @return Optional of target HA group (initialized), or empty if missing 
cluster role records
+     * @throws SQLException fails to get or initialize an HA group
+     */
+    public static Optional<HighAvailabilityGroup> get(String url, Properties 
properties)
+            throws SQLException {
+        HAGroupInfo info = getHAGroupInfo(url, properties);
+        if (MISSING_CRR_GROUPS_CACHE.getIfPresent(info) != null) {
+            return Optional.empty();
+        }
+
+        HighAvailabilityGroup haGroup = GROUPS.computeIfAbsent(info,
+                haGroupInfo -> new HighAvailabilityGroup(haGroupInfo, 
properties));
+        try {
+            haGroup.init();
+        } catch (Exception e) {
+            GROUPS.remove(info);
+            haGroup.close();
+            try {
+                CuratorFramework curator1 = 
CURATOR_CACHE.getIfPresent(info.getUrl1());
+                CuratorFramework curator2 = 
CURATOR_CACHE.getIfPresent(info.getUrl2());
+                if (curator1 != null && curator2 != null) {
+                    Stat node1 = 
curator1.checkExists().forPath(info.getZkPath());
+                    Stat node2 = 
curator2.checkExists().forPath(info.getZkPath());
+                    if (node1 == null && node2 == null) {
+                        // The HA group fails to initialize due to missing 
cluster role records on
+                        // both ZK clusters. We will put this HA group into 
negative cache.
+                        MISSING_CRR_GROUPS_CACHE.put(info, true);
+                        return Optional.empty();
+                    }
+                }
+            } catch (Exception e2) {
+                LOG.error("HA group {} failed to initialized. Got exception 
when checking if znode"
+                        + " exists on the two ZK clusters.", info, e2);
+            }
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
+                    .setMessage(String.format("Cannot start HA group %s for 
URL %s", haGroup, url))
+                    .setRootCause(e)
+                    .build()
+                    .buildException();
+        }
+        return Optional.of(haGroup);
+    }
+
+    /**
+     * This method helps client to get the single cluster to fallback.
+     * <p>
+     * When getting HA group using {@link #get(String, Properties)}, it may 
return empty (not null
+     * or exception) value. In that case client may choose to fall back to a 
single cluster
+     * connection to compensate missing cluster role records instead of throw 
errors.
+     *
+     * @param url        The HA connection url optionally; empty optional if 
properties disables fallback
+     * @param properties The client connection properties
+     * @return The connection url of the single cluster to fall back
+     * @throws SQLException if fails to get HA information and/or invalid 
properties are seen
+     */
+    static Optional<String> getFallbackCluster(String url, Properties 
properties) throws SQLException {
+        HAGroupInfo haGroupInfo = getHAGroupInfo(url, properties);
+
+        String fallback = 
properties.getProperty(PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_KEY,
+                PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_DEFAULT);
+        if (!Boolean.parseBoolean(fallback)) {
+            LOG.info("Fallback to single cluster not enabled for the HA group 
{} per configuration."
+                    + " HA url: '{}'.", haGroupInfo.getName(), url);
+            return Optional.empty();
+        }
+        String fallbackCluster = 
properties.getProperty(PHOENIX_HA_FALLBACK_CLUSTER_KEY);
+        if (StringUtils.isEmpty(fallbackCluster)) {
+            fallbackCluster = haGroupInfo.getUrl1();
+        }
+        LOG.info("Falling back to single cluster '{}' for the HA group {} to 
serve HA connection "
+                        + "request against url '{}'.",
+                fallbackCluster, haGroupInfo.getName(), url);
+        return Optional.of(fallbackCluster);
+    }
+
+    /**
+     * Get an active curator ZK client for the given properties and ZK 
endpoint.
+     * <p>
+     * This can be from cached object since Curator should be shared per 
cluster.
+     *
+     * @param jdbcUrl    the ZK endpoint host:port or the JDBC connection 
String host:port:/hbase
+     * @param properties the properties defining time out values and retry 
count
+     * @return a new Curator framework client
+     */
+    @SuppressWarnings("UnstableApiUsage")
+    public static CuratorFramework getCurator(String jdbcUrl, Properties 
properties)
+            throws IOException {
+        try {
+            return CURATOR_CACHE.get(jdbcUrl, () -> {
+                CuratorFramework curator = createCurator(jdbcUrl, properties);
+                if 
(!curator.blockUntilConnected(PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT,
+                        TimeUnit.MILLISECONDS))
+                    throw new RuntimeException("Failed to connect to the 
CuratorFramework in "
+                            + "timeout " + 
PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT + " ms");
+                return curator;
+            });
+        } catch (Exception e) {
+            LOG.error("Fail to get an active curator for url {}", jdbcUrl, e);
+            // invalidate the cache when getting/creating throws exception
+            CURATOR_CACHE.invalidate(jdbcUrl);
+            throw new IOException(e);
+        }
+    }
+
+    /**
+     * Create a curator ZK client for the given properties and ZK endpoint.
+     * <p>
+     * Unless caller needs a new curator, it should use {@link 
#getCurator(String, Properties)}.
+     */
+    private static CuratorFramework createCurator(String jdbcUrl, Properties 
properties) {
+        // Get the ZK endpoint in host:port format by removing JDBC protocol 
and HBase root node
+        final String zkUrl;
+        if (jdbcUrl.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) {
+            jdbcUrl = jdbcUrl.substring(PhoenixRuntime.JDBC_PROTOCOL.length() 
+ 1);
+        }
+        Preconditions.checkArgument(!StringUtils.isEmpty(jdbcUrl), "JDBC url 
is empty!");
+        String[] urls = jdbcUrl.split(":");
+        if (urls.length == 1) {
+            zkUrl = String.format("%s:%s", urls[0], 
HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+        } else if (urls.length == 2 || urls.length == 3) {
+            zkUrl = String.format("%s:%s", urls[0], urls[1]);
+        } else {
+            throw new IllegalArgumentException("Invalid JDBC url!" + jdbcUrl);
+        }
+
+        // Get timeout and retry counts
+        String connectionTimeoutMsProp = properties.getProperty(
+                PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_KEY);
+        final int connectionTimeoutMs = 
!StringUtils.isEmpty(connectionTimeoutMsProp)
+                ? Integer.parseInt(connectionTimeoutMsProp)
+                : PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT;
+        String sessionTimeoutMsProps = 
properties.getProperty(PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_KEY);
+        final int sessionTimeoutMs = 
!StringUtils.isEmpty(sessionTimeoutMsProps)
+                ? Integer.parseInt(sessionTimeoutMsProps)
+                : PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_DEFAULT;
+        final RetryPolicy retryPolicy = createRetryPolicy(properties);
+
+        CuratorFramework curator = CuratorFrameworkFactory
+                .builder()
+                .connectString(zkUrl)
+                .namespace(PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE)
+                .connectionTimeoutMs(connectionTimeoutMs)
+                .sessionTimeoutMs(sessionTimeoutMs)
+                .retryPolicy(retryPolicy)
+                .canBeReadOnly(true)
+                .build();
+        curator.start();
+        return curator;
+    }
+
+    /**
+     * Create a Curator retry policy from properties.
+     * <p>
+     * If properties is null, return a default retry policy.
+     *
+     * @param properties properties defining timeout and max retries
+     * @return a retry policy which can be used for Curator operations
+     */
+    public static RetryPolicy createRetryPolicy(Properties properties) {
+        if (properties == null) {
+            return RETRY_POLICY;
+        }
+        String baseSleepTimeMsProp = 
properties.getProperty(PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_KEY);
+        int baseSleepTimeMs = StringUtils.isNotEmpty(baseSleepTimeMsProp)
+                ? Integer.parseInt(baseSleepTimeMsProp)
+                : PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_DEFAULT;
+        String maxRetriesProp = 
properties.getProperty(PHOENIX_HA_ZK_RETRY_MAX_KEY);
+        int maxRetries = StringUtils.isNotEmpty(maxRetriesProp)
+                ? Integer.parseInt(maxRetriesProp)
+                : PHOENIX_HA_ZK_RETRY_MAX_DEFAULT;
+        String maxSleepTimeMsProp = 
properties.getProperty(PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY);
+        int maxSleepTimeMs = StringUtils.isNotEmpty(maxSleepTimeMsProp)
+                ? Integer.parseInt(maxSleepTimeMsProp)
+                : PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_DEFAULT;
+        return new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries, 
maxSleepTimeMs);
+    }
+
+    /**
+     * Initialize this HA group by registering ZK watchers and getting initial 
cluster role record.
+     * <p>
+     * If this is already initialized, calling this method is a no-op. This 
method is lock free as
+     * current thread will either return fast or wait for the in-progress 
initialization or timeout.
+     */
+    public void init() throws IOException {
+        if (state != State.UNINITIALIZED) {
+            return;
+        }
+
+        PairOfSameType<HAClusterRoleManager> newRoleManagers = new 
PairOfSameType<>(
+                new HAClusterRoleManager(info.urls.getFirst(), properties),
+                new HAClusterRoleManager(info.urls.getSecond(), properties));
+        if (!roleManagers.compareAndSet(null, newRoleManagers)) {
+            LOG.info("Someone already started role managers; waiting for that 
one...");
+            waitForInitialization(properties);
+            return;
+        }
+
+        Future<?> f1 = roleManagerExecutor.submit(newRoleManagers.getFirst());
+        Future<?> f2 = roleManagerExecutor.submit(newRoleManagers.getSecond());
+        try {
+            waitForInitialization(properties);
+        } catch (IOException e) {
+            // HA group that fails to initialize will not be kept in the 
global cache.
+            // Next connection request will create and initialize a new HA 
group.
+            // Before returning in case of exception, following code will 
cancel the futures.
+            f1.cancel(true);
+            f2.cancel(true);
+            throw e;
+        }
+
+        assert roleRecord != null;
+        LOG.info("Initial cluster role for HA group {} is {}", info, 
roleRecord);
+    }
+
+    /**
+     * Helper method that will block current thread until the HA group is 
initialized.
+     * <p>
+     * After returning, the HA state might not be in READY state. That is 
possible when a new ZK
+     * node change is detected triggering HA group to become IN_TRANSIT state.
+     *
+     * @param properties the connection properties
+     * @throws IOException when current HA group is not initialized before 
timeout
+     */
+    private void waitForInitialization(Properties properties) throws 
IOException {
+        String connectionTimeoutMsProp = properties.getProperty(
+                PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_KEY);
+        int timeout = !StringUtils.isEmpty(connectionTimeoutMsProp)
+                ? Integer.parseInt(connectionTimeoutMsProp)
+                : PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT;
+        boolean started = false;
+        try {
+            started = roleManagerLatch.await(timeout, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOG.warn("Got interrupted when waiting for cluster role managers 
to start", e);
+            Thread.currentThread().interrupt();
+        }
+        if (!started) {
+            LOG.warn("Timed out {}ms waiting for HA group '{}' to be 
initialized.", timeout, info);
+            throw new IOException("Fail to initialize HA group " + info);
+        }
+    }
+
+    /**
+     * Create a JDBC connection in this high availability group.
+     *
+     * @param properties connection properties
+     * @return a JDBC connection implementation
+     * @throws SQLException if fails to connect a JDBC connection
+     */
+    public Connection connect(Properties properties) throws SQLException {
+        if (state != State.READY) {
+            throw new SQLExceptionInfo
+                    .Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
+                    .setMessage("HA group is not ready!")
+                    .setHaGroupInfo(info.toString())
+                    .build()
+                    .buildException();
+        }
+        return roleRecord.getPolicy().provide(this, properties);
+    }
+
+    /**
+     * Get a Phoenix connection against the current active HBase cluster.
+     * <p>
+     * If there is no active cluster, it will throw exception instead of 
blocking or retrying.
+     *
+     * @param properties connection properties
+     * @return a Phoenix connection to current active HBase cluster
+     * @throws SQLException if fails to get a connection
+     */
+    PhoenixConnection connectActive(final Properties properties) throws 
SQLException {
+        try {
+            Optional<String> url = roleRecord.getActiveUrl();
+            if (state == State.READY && url.isPresent()) {
+                PhoenixConnection conn = connectToOneCluster(url.get(), 
properties);
+                // After connection is created, double check if the cluster is 
still ACTIVE
+                // This is to make sure the newly created connection will not 
be returned to client
+                // if the target cluster is not active any more. This can 
happen during failover.
+                if (state == State.READY && isActive(conn)) {
+                    return conn;
+                } else {
+                    conn.close();

Review Comment:
   Is there any way for the conn to leak here? I don't immediately see one, but 
worth a second look since there's no easy way to put the close() in a finally 
block. 



##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java:
##########
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.io.Closeable;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.PosixParser;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.atomic.AtomicValue;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicValue;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import org.apache.phoenix.util.JacksonUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The command line tool to manage high availability (HA) groups and their 
cluster roles.
+ */
+public class PhoenixHAAdminTool extends Configured implements Tool {
+    // Following are return value of this tool. We need this to be very 
explicit because external
+    // system calling this tool may need to retry, alert or audit the 
operations of cluster roles.
+    public static final int RET_SUCCESS = 0; // Saul Goodman
+    public static final int RET_ARGUMENT_ERROR = 1; // arguments are invalid
+    public static final int RET_SYNC_ERROR = 2; //  error to sync from 
manifest to ZK
+    public static final int RET_REPAIR_FOUND_INCONSISTENCIES = 3; // error to 
repair current ZK
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PhoenixHAAdminTool.class);
+
+    private static final Option HELP_OPT = new Option("h", "help", false, 
"Show this help");
+    private static final Option FORCEFUL_OPT =
+            new Option("F", "forceful", false,
+                    "Forceful writing cluster role records ignoring errors on 
other clusters");
+    private static final Option MANIFEST_OPT =
+            new Option("m", "manifest", true, "Manifest file containing 
cluster role records");
+    private static final Option LIST_OPT =
+            new Option("l", "list", false, "List all HA groups stored on this 
ZK cluster");
+    private static final Option REPAIR_OPT = new Option("r", "repair", false,
+            "Verify all HA groups stored on this ZK cluster and repair if 
inconsistency found");
+    @VisibleForTesting
+    static final Options OPTIONS = new Options()
+            .addOption(HELP_OPT)
+            .addOption(FORCEFUL_OPT)
+            .addOption(MANIFEST_OPT)
+            .addOption(LIST_OPT)
+            .addOption(REPAIR_OPT);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        CommandLine commandLine;
+        try {
+            commandLine = parseOptions(args);
+        } catch (Exception e) {
+            System.err.println(
+                    "ERROR: Unable to parse command-line arguments " + 
Arrays.toString(args) + " due to: " + e);
+            printUsageMessage();
+            return RET_ARGUMENT_ERROR;
+        }
+
+        if (commandLine.hasOption(HELP_OPT.getOpt())) {
+            printUsageMessage();
+            return RET_SUCCESS;
+        } else if (commandLine.hasOption(LIST_OPT.getOpt())) { // list
+            String zkUrl = getLocalZkUrl(getConf()); // Admin is created 
against local ZK cluster
+            try (PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(zkUrl, 
getConf(), HighAvailibilityCuratorProvider.INSTANCE)) {
+                List<ClusterRoleRecord> records = 
admin.listAllClusterRoleRecordsOnZookeeper();
+                JacksonUtil.getObjectWriterPretty().writeValue(System.out, 
records);
+            }
+        } else if (commandLine.hasOption(MANIFEST_OPT.getOpt())) { // create 
or update
+            String fileName = 
commandLine.getOptionValue(MANIFEST_OPT.getOpt());
+            List<ClusterRoleRecord> records = readRecordsFromFile(fileName);
+            boolean forceful = commandLine.hasOption(FORCEFUL_OPT.getOpt());
+            Map<String, List<String>> failedHaGroups = 
syncClusterRoleRecords(records, forceful);
+            if (!failedHaGroups.isEmpty()) {
+                System.out.println("Found following HA groups are failing to 
write the clusters:");
+                failedHaGroups.forEach((k, v) ->
+                        System.out.printf("%s -> [%s]\n", k, String.join(",", 
v)));
+                return RET_SYNC_ERROR;
+            }
+        } else if (commandLine.hasOption(REPAIR_OPT.getOpt()))  { // verify 
and repair
+            String zkUrl = getLocalZkUrl(getConf()); // Admin is created 
against local ZK cluster
+            try (PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(zkUrl, 
getConf(), HighAvailibilityCuratorProvider.INSTANCE)) {
+                List<String> inconsistentRecord = 
admin.verifyAndRepairWithRemoteZnode();
+                if (!inconsistentRecord.isEmpty()) {
+                    System.out.println("Found following inconsistent cluster 
role records: ");
+                    System.out.print(String.join(",", inconsistentRecord));
+                    return RET_REPAIR_FOUND_INCONSISTENCIES;
+                }
+            }
+        }
+        return RET_SUCCESS;
+    }
+
+    /**
+     * Read cluster role records defined in the file, given file name.
+     *
+     * @param file The local manifest file name to read from
+     * @return list of cluster role records defined in the manifest file
+     * @throws Exception when parsing or reading from the input file
+     */
+    @VisibleForTesting
+    List<ClusterRoleRecord> readRecordsFromFile(String file) throws Exception {
+        Preconditions.checkArgument(!StringUtils.isEmpty(file));
+        String fileType = FilenameUtils.getExtension(file);
+        switch (fileType) {
+        case "json":
+            // TODO: use jackson or standard JSON library according to 
PHOENIX-5789
+            try (Reader reader = new FileReader(file)) {
+                ClusterRoleRecord[] records =
+                        
JacksonUtil.getObjectReader(ClusterRoleRecord[].class).readValue(reader);
+                return Arrays.asList(records);
+            }
+        case "yaml":
+            LOG.error("YAML file is not yet supported. See W-8274533");
+        default:
+            throw new Exception("Can not read cluster role records from file 
'" + file + "' " +
+                    "reason: unsupported file type");
+        }
+    }
+
+    /**
+     * Helper method to write the given cluster role records into the ZK 
clusters respectively.
+     *
+     * // TODO: add retry logics
+     *
+     * @param records The cluster role record list to save on ZK
+     * @param forceful if true, this method will ignore errors on other 
clusters; otherwise it will
+     *                 not update next cluster (in order) if there is any 
failure on current cluster
+     * @return a map of HA group name to list cluster's url for cluster role 
record failing to write
+     */
+    private Map<String, List<String>> 
syncClusterRoleRecords(List<ClusterRoleRecord> records,
+            boolean forceful) throws IOException {
+        Map<String, List<String>> failedHaGroups = new HashMap<>();
+        for (ClusterRoleRecord record : records) {
+            String haGroupName = record.getHaGroupName();
+            try (PhoenixHAAdminHelper admin1 = new 
PhoenixHAAdminHelper(record.getZk1(), getConf(), 
HighAvailibilityCuratorProvider.INSTANCE);
+                    PhoenixHAAdminHelper admin2 = new 
PhoenixHAAdminHelper(record.getZk2(), getConf(), 
HighAvailibilityCuratorProvider.INSTANCE)) {
+                // Update the cluster previously ACTIVE cluster first.
+                // It reduces the chances of split-brain between clients and 
clusters.
+                // If can not determine previous ACTIVE cluster, update new 
STANDBY cluster first.
+                final PairOfSameType<PhoenixHAAdminHelper> pair;
+                if (admin1.isCurrentActiveCluster(haGroupName)) {
+                    pair = new PairOfSameType<>(admin1, admin2);
+                } else if (admin2.isCurrentActiveCluster(haGroupName)) {
+                    pair = new PairOfSameType<>(admin2, admin1);
+                } else if (record.getRole(admin1.getZkUrl()) == 
ClusterRole.STANDBY) {
+                    pair = new PairOfSameType<>(admin1, admin2);
+                } else {
+                    pair = new PairOfSameType<>(admin2, admin1);

Review Comment:
   how would we ever get here? (if neither 1 nor 2 are ACTIVE, and 1 isn't 
STANDBY?) 



##########
phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixResultSetTest.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.jdbc;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+
+public class ParallelPhoenixResultSetTest {
+    CompletableFuture<ResultSet> completableRs1;
+    CompletableFuture<ResultSet> completableRs2;
+
+    ParallelPhoenixResultSet resultSet;
+
+    @Before
+    public void init() {
+        completableRs1 = Mockito.mock(CompletableFuture.class);
+        completableRs2 = Mockito.mock(CompletableFuture.class);
+        resultSet =
+                new ParallelPhoenixResultSet(
+                        new ParallelPhoenixContext(new Properties(), null,
+                                HighAvailabilityTestingUtility
+                                        
.getListOfSingleThreadExecutorServices(),
+                                null),
+                        completableRs1, completableRs2);
+    }
+
+    @Test
+    public void testUnbound() throws SQLException {
+        ResultSet rs = resultSet.getResultSet();
+        assertNull(rs);
+    }
+
+    @Test
+    public void testNextBound() throws SQLException {
+        ResultSet rs = Mockito.mock(ResultSet.class);
+        resultSet.setResultSet(rs);
+        resultSet.next();
+        Mockito.verify(rs).next();
+        Mockito.verifyNoMoreInteractions(rs);
+    }
+
+    @Test
+    public void testRS1WinsNext() throws Exception {
+
+        ResultSet rs1 = Mockito.mock(ResultSet.class);
+        ResultSet rs2 = Mockito.mock(ResultSet.class);
+
+        Executor rsExecutor2 = Mockito.mock(Executor.class);
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        //inject a sleep
+        doAnswer(
+                (InvocationOnMock invocation) -> {
+                    Thread thread = new Thread(() -> {
+                        try {
+                            //TODO: Remove this sleep
+                            latch.await(5, TimeUnit.SECONDS);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException();
+                        }
+                        ((Runnable) invocation.getArguments()[0]).run();
+                        return;
+                    });
+                    thread.start();
+                    return null;
+                }
+        ).when(rsExecutor2).execute(any(Runnable.class));
+
+        completableRs1 = CompletableFuture.completedFuture(rs1);
+
+        completableRs2 = CompletableFuture.supplyAsync(() -> rs2, rsExecutor2);
+
+        resultSet =
+                new ParallelPhoenixResultSet(
+                        new ParallelPhoenixContext(new Properties(), null,
+                                HighAvailabilityTestingUtility
+                                        
.getListOfSingleThreadExecutorServices(),
+                                null),
+                        completableRs1, completableRs2);
+
+        resultSet.next();
+
+        assertEquals(rs1, resultSet.getResultSet());
+    }
+
+    @Test
+    public void testRS2WinsNext() throws Exception {
+        ResultSet rs1 = Mockito.mock(ResultSet.class);
+        ResultSet rs2 = Mockito.mock(ResultSet.class);
+
+        Executor rsExecutor1 = Mockito.mock(Executor.class);
+        CountDownLatch latch = new CountDownLatch(1);
+        //inject a sleep
+        doAnswer(
+                (InvocationOnMock invocation) -> {
+                    Thread thread = new Thread(() -> {
+                        try {
+                            latch.await(5, TimeUnit.SECONDS);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException();
+                        }
+                        ((Runnable) invocation.getArguments()[0]).run();
+                        return;
+                    });
+                    thread.start();
+                    return null;
+                }
+        ).when(rsExecutor1).execute(any(Runnable.class));
+
+        completableRs1 = CompletableFuture.supplyAsync(() -> rs1, rsExecutor1);
+        completableRs2 = CompletableFuture.completedFuture(rs2);
+
+        resultSet =
+                new ParallelPhoenixResultSet(
+                        new ParallelPhoenixContext(new Properties(), null,
+                                HighAvailabilityTestingUtility
+                                        
.getListOfSingleThreadExecutorServices(),
+                                null),
+                        completableRs1, completableRs2);
+
+        resultSet.next();
+
+        assertEquals(rs2, resultSet.getResultSet());
+    }
+
+    @Test
+    public void testRS1FailsImmediatelyNext() throws Exception {
+        ResultSet rs2 = Mockito.mock(ResultSet.class);
+        Executor rsExecutor2 = Mockito.mock(Executor.class);
+        CountDownLatch latch = new CountDownLatch(1);
+        //inject a sleep
+        doAnswer(
+                (InvocationOnMock invocation) -> {
+                    Thread thread = new Thread(() -> {
+                        try {
+                            latch.await(5, TimeUnit.SECONDS);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException();
+                        }
+                        ((Runnable) invocation.getArguments()[0]).run();
+                        return;
+                    });
+                    thread.start();
+                    return null;
+                }
+        ).when(rsExecutor2).execute(any(Runnable.class));
+
+        //CompletableFuture.failedFuture(new RuntimeException("Failure"));

Review Comment:
   nit: commented code can be removed



##########
phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolTest.java:
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import static org.apache.phoenix.jdbc.PhoenixHAAdminTool.RET_SUCCESS;
+import static org.apache.phoenix.jdbc.PhoenixHAAdminTool.getLocalZkUrl;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.contains;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.GetDataBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit test for {@link PhoenixHAAdminTool} including the helper class {@link 
PhoenixHAAdminHelper}.
+ *
+ * @see PhoenixHAAdminToolIT
+ */
+public class PhoenixHAAdminToolTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PhoenixHAAdminToolTest.class);
+    private static final String ZK1 = "zk1:2181:/hbase";
+    private static final String ZK2 = "zk2:2181:/hbase";
+    private static final PrintStream STDOUT = System.out;
+    private static final ByteArrayOutputStream STDOUT_CAPTURE = new 
ByteArrayOutputStream();
+
+    private final PhoenixHAAdminTool.HighAvailibilityCuratorProvider 
mockHighAvailibilityCuratorProvider = 
Mockito.mock(PhoenixHAAdminTool.HighAvailibilityCuratorProvider.class);
+
+    /** Use mocked curator since there is no mini-ZK cluster. */
+    private final CuratorFramework curator = 
Mockito.mock(CuratorFramework.class);
+    /** HA admin to test for one test case. */
+    private final PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(ZK1, 
new Configuration(), mockHighAvailibilityCuratorProvider);
+
+    private String haGroupName;
+    private ClusterRoleRecord recordV1;
+
+    @Rule
+    public final TestName testName = new TestName();
+
+    @Before
+    public void setup() throws Exception {
+        
when(mockHighAvailibilityCuratorProvider.getCurator(Mockito.anyString(),Mockito.anyObject())).thenReturn(curator);
+        haGroupName = testName.getMethodName();
+        recordV1 = new ClusterRoleRecord(
+                haGroupName, HighAvailabilityPolicy.FAILOVER,
+                ZK1, ClusterRole.ACTIVE,
+                ZK2, ClusterRole.STANDBY,
+                1);
+        saveRecordV1ToZk();
+    }
+
+    @After
+    public void after() {
+        // reset STDOUT in case it was captured for testing
+        System.setOut(STDOUT);
+    }
+
+    /**
+     * Test command line options.
+     *
+     * In the test body, we split sections by {} to make sure no variable is 
reused in mistakenly.
+     */
+    @Test
+    public void testCommandLineOption() throws Exception {
+        { // no value for -m option
+            String[] args = { "-m" };
+            int ret = ToolRunner.run(new PhoenixHAAdminTool(), args);
+            assertEquals(PhoenixHAAdminTool.RET_ARGUMENT_ERROR, ret);
+        }
+        { // -l does not work with -m option
+            String[] args = { "-l", "-m", "cluster-role-records.yaml" };
+            int ret = ToolRunner.run(new PhoenixHAAdminTool(), args);
+            assertEquals(PhoenixHAAdminTool.RET_ARGUMENT_ERROR, ret);
+        }
+        { // -l does not work with -F/--forceful option
+            String[] args = { "-l", "-F"};
+            int ret = ToolRunner.run(new PhoenixHAAdminTool(), args);
+            assertEquals(PhoenixHAAdminTool.RET_ARGUMENT_ERROR, ret);
+        }
+        { // -l does not work with --repair option
+            String[] args = { "-l", "-r"};
+            int ret = ToolRunner.run(new PhoenixHAAdminTool(), args);
+            assertEquals(PhoenixHAAdminTool.RET_ARGUMENT_ERROR, ret);
+        }
+        { // -m does not work with --repair option
+            String[] args = { "-m", "cluster-role-records.yaml", "-r"};
+            int ret = ToolRunner.run(new PhoenixHAAdminTool(), args);
+            assertEquals(PhoenixHAAdminTool.RET_ARGUMENT_ERROR, ret);
+        }
+    }
+
+    /**
+     * Test that helper method works for reading cluster role records from 
JSON file.
+     */
+    @Test
+    public void testReadRecordsFromFileJson() throws Exception {
+        { // one record in JSON file
+            String fileName = 
ClusterRoleRecordTest.createJsonFileWithRecords(recordV1);
+            List<ClusterRoleRecord> records = new 
PhoenixHAAdminTool().readRecordsFromFile(fileName);
+            assertEquals(1, records.size());
+            assertTrue(records.contains(recordV1));
+        }
+
+        { // two records in JSON file
+            String haGroupName2 = haGroupName + 
RandomStringUtils.randomAlphabetic(3);
+            ClusterRoleRecord record2 = new ClusterRoleRecord(
+                    haGroupName2, HighAvailabilityPolicy.FAILOVER,
+                    ZK1, ClusterRole.ACTIVE,
+                    ZK2, ClusterRole.STANDBY,
+                    1);
+            String fileName = 
ClusterRoleRecordTest.createJsonFileWithRecords(recordV1, record2);
+            List<ClusterRoleRecord> records = new 
PhoenixHAAdminTool().readRecordsFromFile(fileName);
+            assertEquals(2, records.size());
+            assertTrue(records.contains(recordV1));
+            assertTrue(records.contains(record2));
+        }
+    }
+
+    /**
+     * Test that agent will try to crate znode if it does not exist.

Review Comment:
   tiny nit: "create znode"



##########
phoenix-core/src/test/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.phoenix.exception.FailoverSQLException;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.HighAvailabilityGroup.HAGroupInfo;
+
+/**
+ * Unit test for {@link FailoverPhoenixConnection}.
+ *
+ * @see FailoverPhoenixConnectionIT
+ */
+public class FailoverPhoenixConnectionTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FailoverPhoenixConnectionTest.class);
+
+    @Mock PhoenixConnection connection1;
+    @Mock PhoenixConnection connection2;
+    @Mock HighAvailabilityGroup haGroup;
+
+    final HAGroupInfo haGroupInfo = new HAGroupInfo("fake", "zk1", "zk2");
+    FailoverPhoenixConnection failoverConnection; // this connection itself is 
not mocked or spied.
+
+    @Before
+    public void init() throws SQLException {
+        MockitoAnnotations.initMocks(this);
+        when(haGroup.getGroupInfo()).thenReturn(haGroupInfo);
+        
when(haGroup.connectActive(any(Properties.class))).thenReturn(connection1);
+
+        failoverConnection = new FailoverPhoenixConnection(haGroup, new 
Properties());
+    }
+
+    /**
+     * Test helper method {@link 
FailoverPhoenixConnection#wrapActionDuringFailover}.
+     */
+    @Test
+    public void testWrapActionDuringFailover() throws SQLException {
+        // Test SupplierWithSQLException which returns a value
+        String str = "Hello, World!";
+        assertEquals(str, failoverConnection.wrapActionDuringFailover(() -> 
str));
+
+        // Test RunWithSQLException which does not return value
+        final AtomicInteger counter = new AtomicInteger(0);
+        failoverConnection.wrapActionDuringFailover(counter::incrementAndGet);
+        assertEquals(1, counter.get());
+    }
+
+    /**
+     * Test that after calling failover(), the old connection got closed with 
FailoverSQLException,
+     * and a new Phoenix connection is opened.
+     */
+    @Test
+    public void testFailover() throws SQLException {
+        // Make HAGroup return a different phoenix connection when it gets 
called next time
+        
when(haGroup.connectActive(any(Properties.class))).thenReturn(connection2);
+
+        // explicit call failover
+        failoverConnection.failover(1000L);
+
+        // The old connection should have been closed due to failover
+        verify(connection1, times(1)).close(any(FailoverSQLException.class));
+        // A new Phoenix connection is wrapped underneath
+        assertEquals(connection2, failoverConnection.getWrappedConnection());
+    }
+
+    /**
+     * Test static {@link FailoverPhoenixConnection#failover(Connection, 
long)} method.
+     */
+    @Test
+    public void testFailoverStatic() throws SQLException {
+        try {
+            FailoverPhoenixConnection.failover(connection1, 1000L);
+            fail("Should have failed since plain phoenix connection can not 
failover!");
+        } catch (SQLException e) {
+            LOG.info("Got expected exception when trying to failover on non-HA 
connection", e);

Review Comment:
   nit: can we match the SQLExceptionCode to make sure it's the right error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to