gjacoby126 commented on code in PR #1430: URL: https://github.com/apache/phoenix/pull/1430#discussion_r882048420
########## phoenix-core/src/main/java/org/apache/phoenix/jdbc/FailoverPolicy.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import java.sql.Connection; +import java.util.Properties; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.http.annotation.Immutable; +import org.apache.phoenix.exception.FailoverSQLException; + +/** + * A failover policy defines how failover connection deals with existing connections in case of + * cluster role transition is detected. + * + * When an HBase cluster is not in ACTIVE role any more, all connections against it will get closed. + * To handle a failover event, the failover connection will use the failover policy for taking + * further actions. Those supported failover policies are defined here, but in future we can load + * the policy implemented and configured by user at runtime for each connection. The default policy + * requires that clients have to deal with the failover exception explicitly. + */ +@Immutable +@FunctionalInterface +public interface FailoverPolicy { + String PHOENIX_HA_FAILOVER_POLICY_ATTR = "phoenix.ha.failover.policy"; + String PHOENIX_HA_FAILOVER_COUNT_ATTR = "phoenix.ha.failover.count"; + + /** + * Should try to failover by connecting to current ACTIVE HBase cluster (if any). + * + * @param exception the exception caught upon which this method is possible called + * @param failoverCount how many time so far this failover has been attempted + * @return true if caller should get a new phoenix connection against the ACTIVE HBase cluster + */ + boolean shouldFailover(Exception exception, int failoverCount); + + /** + * With this policy, clients have to deal with the failover exception explicitly if any. + * + * A {@link FailoverSQLException} exception will be thrown to the client when they try to use + * the closed connections. Specially, the high availability (HA) framework will not connect to + * the new ACTIVE cluster automatically for clients, but instead a client should: + * - re-connect to this HA group, in which case it will get a new connection wrapping a Phoenix + * connection to the newly ACTIVE cluster; OR + * - call static method {@link FailoverPhoenixConnection#failover(Connection,long)} explicitly. + * After that, it can create new Statement/ResultSet and retry the business logic. + * If neither cluster is ACTIVE, connect requests to this HA group will keep getting exception. + */ + class ExplicitFailoverPolicy implements FailoverPolicy { + public static final String NAME = "explicit"; + private static final ExplicitFailoverPolicy INSTANCE = new ExplicitFailoverPolicy(); + + @Override + public boolean shouldFailover(Exception e, int failoverCount) { + return false; + } + + @Override + public String toString() { + return NAME; + } + } + + /** + * With this, failover connection will wrap a new Phoenix connection to the new ACTIVE cluster. + * + * If the current operation (e.g. commit or create Statement) fails, the failover connection + * will try to wrap a new Phoenix connection according to this policy. After that, the client + * will be able to create new Statement/ResultSet created against this failover connection. + * While the HA group is failing over, the failover connection may not be able to failover by + * wrapping a new phoenix connection. In that case, clients trying to use this the failover + * connection will get {@link FailoverSQLException} exception. + * + * The failover to ACTIVE cluster is best-effort; if it succeeds, clients do not notice target + * cluster changed. Some cases are not yet well supported with this failover policy, for e.g. + * after failover, the uncommitted mutations are not populated into the new connection. + * + * In case of {@link FailoverSQLException} exception, clients can still re-connect to this HA + * group by creating a new failover connection, OR call static method failover() explicitly. + */ + @InterfaceStability.Unstable Review Comment: It's pretty rare that Phoenix uses either InterfaceAudience or InterfaceStability. No objection to this being here, just FYI. ########## phoenix-core/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java: ########## @@ -0,0 +1,646 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.phoenix.exception.FailoverSQLException; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +/** + * An implementation of JDBC connection which supports failover between two cluster in an HA group. + * <p> + * During its lifetime, a failover Phoenix connection could possibly connect to two HBase clusters + * in an HA group mutually exclusively. It wraps and delegates the logic to a PhoenixConnection + * object. At any given time, the wrapped connection should only talk to the ACTIVE HBase cluster + * in the HA group. + * <p> + * A failover connection will behave according to the given failover policy upon cluster role + * failover, especially when the current connected HBase cluster becomes STANDBY role from ACTIVE. + * The default behavior (aka default failover policy) will simply close the current connection and + * throw {@link org.apache.phoenix.exception.FailoverSQLException} exception to those clients who + * still use this connection after closing. + * <p> + * This class is not thread safe. + * + * @see HighAvailabilityGroup + * @see FailoverPolicy + */ +public class FailoverPhoenixConnection implements PhoenixMonitoredConnection { + /** + * Failover timeout interval after which failover operation will fail and clients can retry. + */ + public static final String FAILOVER_TIMEOUT_MS_ATTR = "phoenix.ha.failover.timeout.ms"; + public static final long FAILOVER_TIMEOUT_MS_DEFAULT = 10_000; + private static final Logger LOG = LoggerFactory.getLogger(FailoverPhoenixConnection.class); + /** + * Connection properties. + */ + private final Properties properties; + /** + * High availability group. + */ + private final HighAvailabilityGroup haGroup; + /** + * Failover policy, per connection. + */ + private final FailoverPolicy policy; + + /** + * True iff this connection has been closed by the client. + */ + private boolean isClosed; + /** + * The wrapped PhoenixConnection object which could be re-assigned upon failover operation. + */ + private PhoenixConnection connection; + + /** + * Mutation metrics before failover to current connection. + */ + private Map<String, Map<MetricType, Long>> previousMutationMetrics = new HashMap<>(); + /** + * Read metrics before failover to current connection. + */ + private Map<String, Map<MetricType, Long>> previousReadMetrics = new HashMap<>(); + + public FailoverPhoenixConnection(HighAvailabilityGroup haGroup, Properties properties) + throws SQLException { + this.properties = properties; + this.haGroup = haGroup; + this.policy = FailoverPolicy.get(properties); + this.isClosed = false; + this.connection = haGroup.connectActive(properties); + } + + /** + * This is used for explicit failover request made by client. + * <p> + * It fails over to the current ACTIVE HBase cluster; if failover happens in between, this could + * possibly target this same cluster again. + * <p> + * TODO: If there are two ACTIVE clusters, then switch to the other ACTIVE cluster. Review Comment: I assume this TODO is ok to defer? ########## phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java: ########## @@ -165,8 +165,30 @@ public enum MetricType { NUM_METADATA_LOOKUP_FAILURES("nmlf", "Number of Failed metadata lookup calls", LogLevel.DEBUG,PLong.INSTANCE), TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS("tsistrc", "Time spent in RPC calls for systemTable lookup", - LogLevel.DEBUG,PLong.INSTANCE); - + LogLevel.DEBUG,PLong.INSTANCE), + + //HA Related Metrics + HA_PARALLEL_COUNT_OPERATIONS_ACTIVE_CLUSTER("hpoac","Number of Operations to the active cluster",LogLevel.DEBUG,PLong.INSTANCE), + HA_PARALLEL_COUNT_OPERATIONS_STANDBY_CLUSTER("hposc","Number of Operations to the active cluster",LogLevel.DEBUG,PLong.INSTANCE), + HA_PARALLEL_COUNT_FAILED_OPERATIONS_ACTIVE_CLUSTER("hpfac","Number of Operations to the active cluster",LogLevel.DEBUG,PLong.INSTANCE), + HA_PARALLEL_COUNT_FAILED_OPERATIONS_STANDBY_CLUSTER("hpfsc","Number of Operations to the active cluster",LogLevel.DEBUG,PLong.INSTANCE), Review Comment: nit: here again, description of standby metric says active ########## phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java: ########## @@ -0,0 +1,653 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.jdbc.ParallelPhoenixUtil.FutureResult; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.DatabaseMetaData; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLClientInfoException; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import static org.apache.phoenix.exception.SQLExceptionCode.CLASS_NOT_UNWRAPPABLE; + +public class ParallelPhoenixConnection implements PhoenixMonitoredConnection { + + private static final Logger LOG = LoggerFactory.getLogger(ParallelPhoenixConnection.class); + + private final ParallelPhoenixContext context; + CompletableFuture<PhoenixConnection> futureConnection1; + CompletableFuture<PhoenixConnection> futureConnection2; + + public ParallelPhoenixConnection(ParallelPhoenixContext context) throws SQLException { + this.context = context; + LOG.debug("First Url: {} Second Url: {}", context.getHaGroup().getGroupInfo().getUrl1(), + context.getHaGroup().getGroupInfo().getUrl2()); + futureConnection1 = context.chainOnConn1(() -> getConnection(context.getHaGroup(), + context.getHaGroup().getGroupInfo().getUrl1(), + context.getProperties())); + futureConnection2 = context.chainOnConn2(() -> getConnection(context.getHaGroup(), + context.getHaGroup().getGroupInfo().getUrl2(), + context.getProperties())); + } + + @VisibleForTesting + ParallelPhoenixConnection(ParallelPhoenixContext context, CompletableFuture<PhoenixConnection> futureConnection1, CompletableFuture<PhoenixConnection> futureConnection2) { + this.context = context; + this.futureConnection1 = futureConnection1; + this.futureConnection2 = futureConnection2; + } + + private static PhoenixConnection getConnection(HighAvailabilityGroup haGroup, String url, Properties properties) { + try { + return haGroup.connectToOneCluster(url, properties); + } catch (SQLException exception) { + if (LOG.isTraceEnabled()) { + LOG.trace(String.format("Failed to get a connection for haGroup %s to %s", haGroup.toString(), url), exception); + } + throw new CompletionException(exception); + } + } + + public CompletableFuture<PhoenixConnection> getFutureConnection1() { + return futureConnection1; + } + + public CompletableFuture<PhoenixConnection> getFutureConnection2() { + return futureConnection2; + } + + @VisibleForTesting + ParallelPhoenixContext getContext() { + return this.context; + } + + Object runOnConnections(Function<PhoenixConnection, ?> function, boolean useMetrics) throws SQLException { + return ParallelPhoenixUtil.INSTANCE.runFutures(function, futureConnection1, futureConnection2, context, useMetrics); + } + + PairOfSameType<Object> runOnConnectionsGetAll(Function<PhoenixConnection, ?> function, boolean useMetrics) throws SQLException { + return ParallelPhoenixUtil.INSTANCE.runOnFuturesGetAll(function, futureConnection1, futureConnection2, context, useMetrics); + } + + @Override + public ParallelPhoenixStatement createStatement() throws SQLException { + context.checkOpen(); + + Function<PhoenixConnection, PhoenixMonitoredStatement> function = (T) -> { + try { + return (PhoenixStatement) T.createStatement(); + } catch (SQLException exception) { + throw new CompletionException(exception); + } + }; + + List<CompletableFuture<PhoenixMonitoredStatement>> futures = ParallelPhoenixUtil.INSTANCE.applyFunctionToFutures(function, futureConnection1, + futureConnection2, context, true); + + Preconditions.checkState(futures.size() == 2); + CompletableFuture<PhoenixMonitoredStatement> statement1 = futures.get(0); + CompletableFuture<PhoenixMonitoredStatement> statement2 = futures.get(1); + + //Ensure one statement is successful before returning + ParallelPhoenixUtil.INSTANCE.runFutures(futures, context, true); + + return new ParallelPhoenixStatement(context, statement1, statement2); + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + context.checkOpen(); + + Function<PhoenixConnection, PhoenixMonitoredPreparedStatement> function = (T) -> { + try { + return (PhoenixMonitoredPreparedStatement) T.prepareStatement(sql); + } catch (SQLException exception) { + throw new CompletionException(exception); + } + }; + + List<CompletableFuture<PhoenixMonitoredPreparedStatement>> futures = ParallelPhoenixUtil.INSTANCE.applyFunctionToFutures(function, futureConnection1, + futureConnection2, context, true); + + Preconditions.checkState(futures.size() == 2); + CompletableFuture<PhoenixMonitoredPreparedStatement> statement1 = futures.get(0); + CompletableFuture<PhoenixMonitoredPreparedStatement> statement2 = futures.get(1); + + //Ensure one statement is successful before returning + ParallelPhoenixUtil.INSTANCE.runFutures(futures, context, true); + + return new ParallelPhoenixPreparedStatement(this.context, statement1, statement2); + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + return null; + } + + @Override + public String nativeSQL(String sql) throws SQLException { + return null; + } + + @Override + public boolean getAutoCommit() throws SQLException { + Function<PhoenixConnection, Boolean> function = (T) -> { + try { + return T.getAutoCommit(); + } catch (SQLException exception) { + throw new CompletionException(exception); + } + }; + + return (boolean) runOnConnections(function, true); + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + Function<PhoenixConnection, Void> function = (T) -> { + try { + T.setAutoCommit(autoCommit); + return null; + } catch (SQLException exception) { + throw new CompletionException(exception); + } + }; + + runOnConnections(function, true); + } + + @Override + public void commit() throws SQLException { + Function<PhoenixConnection, Void> function = (T) -> { + try { + T.commit(); + return null; + } catch (SQLException exception) { + throw new CompletionException(exception); + } + }; + + runOnConnections(function, true); + } + + @Override + public void rollback() throws SQLException { + + } + + /** + * Close the underlying connections. Returns after any one of the underlying connections have + * closed successfully + * + * @throws SQLException if trying to close both the underlying connections encounters errors + */ + @Override + public void close() throws SQLException { + context.close(); + SQLException closeExp = null; + + Supplier<Boolean> closeSupplier1 = getCloseSupplier(futureConnection1); + Supplier<Boolean> closeSupplier2 = getCloseSupplier(futureConnection2); + + // We can have errors on the chain, we still need to close the underlying connections + // irrespective. Do a close when we're at the end of the chain and wait for any 1 to be + // successful. We need to track which future completed hence use FutureResult + // Enqueue a close operation at the end of the chain in the common ForkJoin Pool + CompletableFuture<FutureResult<CompletableFuture<Boolean>>> closeFuture1 = + context.getChainOnConn1().handle((obj, e) -> { + return CompletableFuture.supplyAsync(closeSupplier1); + }).thenApply(t -> new FutureResult<>(t, 0)); + CompletableFuture<FutureResult<CompletableFuture<Boolean>>> closeFuture2 = + context.getChainOnConn2().handle((obj, e) -> { + return CompletableFuture.supplyAsync(closeSupplier2); + }).thenApply(t -> new FutureResult<>(t, 1)); + + FutureResult<CompletableFuture<Boolean>> result = + (FutureResult<CompletableFuture<Boolean>>) ParallelPhoenixUtil.INSTANCE + .getAnyOfNonExceptionally(Arrays.asList(closeFuture1, closeFuture2), context); + + try { + ParallelPhoenixUtil.INSTANCE.getFutureNoRetry(result.getResult(), context); + return; + } catch (Exception e) { + closeExp = new SQLException(e); + } + // The previous close encountered an exception try the other one + CompletableFuture<FutureResult<CompletableFuture<Boolean>>> otherFuture = + (result.getIndex() == 0) ? closeFuture2 : closeFuture1; + + try { + FutureResult<CompletableFuture<Boolean>> otherResult = Review Comment: Do we risk leaking connections if one connection closes but the other doesn't? Is there ever another opportunity to clean it up? ########## phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixPreparedStatementTest.java: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.jdbc; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; + +import static org.junit.Assert.assertEquals; + + +public class ParallelPhoenixPreparedStatementTest { + + ParallelPhoenixContext context; + CompletableFuture<PhoenixMonitoredPreparedStatement> future1; + CompletableFuture<PhoenixMonitoredPreparedStatement> future2; + PhoenixMonitoredPreparedStatement statement1; + PhoenixMonitoredPreparedStatement statement2; + + + ParallelPhoenixPreparedStatement phoenixPreparedStatement; + + @Before + public void init() throws Exception { + context = new ParallelPhoenixContext(new Properties(), Mockito.mock(HighAvailabilityGroup.class), + HighAvailabilityTestingUtility.getListOfSingleThreadExecutorServices(), null); + + statement1 = Mockito.mock(PhoenixMonitoredPreparedStatement.class); + statement2 = Mockito.mock(PhoenixMonitoredPreparedStatement.class); + + future1 = CompletableFuture.completedFuture(statement1); + future2 = CompletableFuture.completedFuture(statement2); + + phoenixPreparedStatement = new ParallelPhoenixPreparedStatement(context,future1,future2); + } + + @Test + public void getStatement1() throws SQLException { + future1 = Mockito.mock(CompletableFuture.class); + future2 = Mockito.mock(CompletableFuture.class); + phoenixPreparedStatement = new ParallelPhoenixPreparedStatement(context,future1,future2); + assertEquals(future1, phoenixPreparedStatement.getStatement1()); + } + + @Test + public void getStatement2() throws SQLException { + future1 = Mockito.mock(CompletableFuture.class); + future2 = Mockito.mock(CompletableFuture.class); + phoenixPreparedStatement = new ParallelPhoenixPreparedStatement(context,future1,future2); + assertEquals(future2, phoenixPreparedStatement.getStatement2()); + } + + @Test + public void executeQuery() throws SQLException, ExecutionException, InterruptedException { + ResultSet mockResultSet1 = Mockito.mock(ResultSet.class); + ResultSet mockResultSet2 = Mockito.mock(ResultSet.class); + + Mockito.when(statement1.executeQuery()).thenReturn(mockResultSet1); + Mockito.when(statement2.executeQuery()).thenReturn(mockResultSet2); + + ResultSet rs = phoenixPreparedStatement.executeQuery(); + + //TODO: make this less dependant on sleep + Thread.sleep(5000); + + Mockito.verify(statement1).executeQuery(); + Mockito.verify(statement2).executeQuery(); + ParallelPhoenixResultSet parallelRS = (ParallelPhoenixResultSet) rs; + assertEquals(mockResultSet1,parallelRS.getResultSetFuture1().get()); + assertEquals(mockResultSet2,parallelRS.getResultSetFuture2().get()); + } + + @Test + public void setInt() throws SQLException, ExecutionException, InterruptedException { + phoenixPreparedStatement.setInt(1,2); + + //TODO: make this less dependant on sleep + Thread.sleep(5000); Review Comment: setInt is asynchronous? ########## phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.jdbc; + +import static org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole.ACTIVE; +import static org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole.STANDBY; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.Properties; + +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.monitoring.GlobalClientMetrics; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An HighAvailabilityGroup provides a JDBC connection from given connection string and properties. + */ +enum HighAvailabilityPolicy { + FAILOVER { + @Override + public Connection provide(HighAvailabilityGroup haGroup, Properties info) + throws SQLException { + return new FailoverPhoenixConnection(haGroup, info); + } + @Override + void transitClusterRole(HighAvailabilityGroup haGroup, ClusterRoleRecord oldRecord, + ClusterRoleRecord newRecord) throws SQLException { + if (oldRecord.getRole1() == ACTIVE && newRecord.getRole1() == STANDBY) { + transitStandby(haGroup, oldRecord.getZk1()); + } + if (oldRecord.getRole2() == ACTIVE && newRecord.getRole2() == STANDBY) { + transitStandby(haGroup, oldRecord.getZk2()); + } + if (oldRecord.getRole1() != ACTIVE && newRecord.getRole1() == ACTIVE) { + transitActive(haGroup, oldRecord.getZk1()); + } + if (oldRecord.getRole2() != ACTIVE && newRecord.getRole2() == ACTIVE) { + transitActive(haGroup, oldRecord.getZk2()); + } + } + private void transitStandby(HighAvailabilityGroup haGroup, String zkUrl) + throws SQLException { + // Close connections when a previously ACTIVE HBase cluster becomes STANDBY. + LOG.info("Cluster {} becomes STANDBY in HA group {}, now close all its connections", + zkUrl, haGroup.getGroupInfo()); + ConnectionQueryServices cqs = null; + try { + cqs = PhoenixDriver.INSTANCE.getConnectionQueryServices(zkUrl, + haGroup.getProperties()); + cqs.closeAllConnections(new SQLExceptionInfo + .Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER) + .setMessage("Phoenix connection got closed due to failover") + .setHaGroupInfo(haGroup.getGroupInfo().toString())); + LOG.info("Closed all connections to cluster {} for HA group {}", zkUrl, + haGroup.getGroupInfo()); + } finally { + if (cqs != null) { + // CQS is closed but it is not invalidated from global cache in PhoenixDriver + // so that any new connection will get error instead of creating a new CQS + LOG.info("Closing CQS after cluster '{}' becomes STANDBY", zkUrl); + cqs.close(); + LOG.info("Successfully closed CQS after cluster '{}' becomes STANDBY", zkUrl); + } + } + } + private void transitActive(HighAvailabilityGroup haGroup, String zkUrl) + throws SQLException { + // Invalidate CQS cache if any that has been closed but has not been cleared + PhoenixDriver.INSTANCE.invalidateCache(zkUrl, haGroup.getProperties()); + } + }, + + PARALLEL { + @Override + public Connection provide(HighAvailabilityGroup haGroup, Properties info) + throws SQLException { + List<Boolean> executorCapacities = PhoenixHAExecutorServiceProvider.hasCapacity(info); + if (executorCapacities.contains(Boolean.TRUE)) { + ParallelPhoenixContext context = + new ParallelPhoenixContext(info, haGroup, + PhoenixHAExecutorServiceProvider.get(info), executorCapacities); + return new ParallelPhoenixConnection(context); + } else { + // TODO: Once we have operation/primary wait timeout use the same + // Give regular connection or a failover connection? + LOG.warn("Falling back to single phoenix connection due to resource constraints"); + GlobalClientMetrics.GLOBAL_HA_PARALLEL_CONNECTION_FALLBACK_COUNTER.increment(); + return haGroup.connectActive(info); + } + } + @Override + void transitClusterRole(HighAvailabilityGroup haGroup, ClusterRoleRecord oldRecord, + ClusterRoleRecord newRecord) { + LOG.info("Cluster role changed for parallel HA policy."); Review Comment: why just a no-op? Because in PARALLEL everything's always ACTIVE/ACTIVE? But then why log that something changed? ########## phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java: ########## @@ -165,8 +165,30 @@ public enum MetricType { NUM_METADATA_LOOKUP_FAILURES("nmlf", "Number of Failed metadata lookup calls", LogLevel.DEBUG,PLong.INSTANCE), TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS("tsistrc", "Time spent in RPC calls for systemTable lookup", - LogLevel.DEBUG,PLong.INSTANCE); - + LogLevel.DEBUG,PLong.INSTANCE), + + //HA Related Metrics + HA_PARALLEL_COUNT_OPERATIONS_ACTIVE_CLUSTER("hpoac","Number of Operations to the active cluster",LogLevel.DEBUG,PLong.INSTANCE), + HA_PARALLEL_COUNT_OPERATIONS_STANDBY_CLUSTER("hposc","Number of Operations to the active cluster",LogLevel.DEBUG,PLong.INSTANCE), Review Comment: nit: description of standby metric says active ########## phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java: ########## @@ -0,0 +1,653 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.jdbc.ParallelPhoenixUtil.FutureResult; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.DatabaseMetaData; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLClientInfoException; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import static org.apache.phoenix.exception.SQLExceptionCode.CLASS_NOT_UNWRAPPABLE; + +public class ParallelPhoenixConnection implements PhoenixMonitoredConnection { + + private static final Logger LOG = LoggerFactory.getLogger(ParallelPhoenixConnection.class); + + private final ParallelPhoenixContext context; + CompletableFuture<PhoenixConnection> futureConnection1; + CompletableFuture<PhoenixConnection> futureConnection2; + + public ParallelPhoenixConnection(ParallelPhoenixContext context) throws SQLException { + this.context = context; + LOG.debug("First Url: {} Second Url: {}", context.getHaGroup().getGroupInfo().getUrl1(), Review Comment: Should this be TRACE because this is a pretty frequent path? ########## phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixConnection.java: ########## @@ -0,0 +1,653 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.jdbc.ParallelPhoenixUtil.FutureResult; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.DatabaseMetaData; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLClientInfoException; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import static org.apache.phoenix.exception.SQLExceptionCode.CLASS_NOT_UNWRAPPABLE; + +public class ParallelPhoenixConnection implements PhoenixMonitoredConnection { + + private static final Logger LOG = LoggerFactory.getLogger(ParallelPhoenixConnection.class); + + private final ParallelPhoenixContext context; + CompletableFuture<PhoenixConnection> futureConnection1; + CompletableFuture<PhoenixConnection> futureConnection2; + + public ParallelPhoenixConnection(ParallelPhoenixContext context) throws SQLException { + this.context = context; + LOG.debug("First Url: {} Second Url: {}", context.getHaGroup().getGroupInfo().getUrl1(), + context.getHaGroup().getGroupInfo().getUrl2()); + futureConnection1 = context.chainOnConn1(() -> getConnection(context.getHaGroup(), + context.getHaGroup().getGroupInfo().getUrl1(), + context.getProperties())); + futureConnection2 = context.chainOnConn2(() -> getConnection(context.getHaGroup(), + context.getHaGroup().getGroupInfo().getUrl2(), + context.getProperties())); + } + + @VisibleForTesting + ParallelPhoenixConnection(ParallelPhoenixContext context, CompletableFuture<PhoenixConnection> futureConnection1, CompletableFuture<PhoenixConnection> futureConnection2) { + this.context = context; + this.futureConnection1 = futureConnection1; + this.futureConnection2 = futureConnection2; + } + + private static PhoenixConnection getConnection(HighAvailabilityGroup haGroup, String url, Properties properties) { + try { + return haGroup.connectToOneCluster(url, properties); + } catch (SQLException exception) { + if (LOG.isTraceEnabled()) { + LOG.trace(String.format("Failed to get a connection for haGroup %s to %s", haGroup.toString(), url), exception); + } + throw new CompletionException(exception); + } + } + + public CompletableFuture<PhoenixConnection> getFutureConnection1() { + return futureConnection1; + } + + public CompletableFuture<PhoenixConnection> getFutureConnection2() { + return futureConnection2; + } + + @VisibleForTesting + ParallelPhoenixContext getContext() { + return this.context; + } + + Object runOnConnections(Function<PhoenixConnection, ?> function, boolean useMetrics) throws SQLException { + return ParallelPhoenixUtil.INSTANCE.runFutures(function, futureConnection1, futureConnection2, context, useMetrics); + } + + PairOfSameType<Object> runOnConnectionsGetAll(Function<PhoenixConnection, ?> function, boolean useMetrics) throws SQLException { + return ParallelPhoenixUtil.INSTANCE.runOnFuturesGetAll(function, futureConnection1, futureConnection2, context, useMetrics); + } + + @Override + public ParallelPhoenixStatement createStatement() throws SQLException { + context.checkOpen(); + + Function<PhoenixConnection, PhoenixMonitoredStatement> function = (T) -> { + try { + return (PhoenixStatement) T.createStatement(); + } catch (SQLException exception) { + throw new CompletionException(exception); + } + }; + + List<CompletableFuture<PhoenixMonitoredStatement>> futures = ParallelPhoenixUtil.INSTANCE.applyFunctionToFutures(function, futureConnection1, + futureConnection2, context, true); + + Preconditions.checkState(futures.size() == 2); + CompletableFuture<PhoenixMonitoredStatement> statement1 = futures.get(0); + CompletableFuture<PhoenixMonitoredStatement> statement2 = futures.get(1); + + //Ensure one statement is successful before returning + ParallelPhoenixUtil.INSTANCE.runFutures(futures, context, true); + + return new ParallelPhoenixStatement(context, statement1, statement2); + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + context.checkOpen(); + + Function<PhoenixConnection, PhoenixMonitoredPreparedStatement> function = (T) -> { + try { + return (PhoenixMonitoredPreparedStatement) T.prepareStatement(sql); + } catch (SQLException exception) { + throw new CompletionException(exception); + } + }; + + List<CompletableFuture<PhoenixMonitoredPreparedStatement>> futures = ParallelPhoenixUtil.INSTANCE.applyFunctionToFutures(function, futureConnection1, + futureConnection2, context, true); + + Preconditions.checkState(futures.size() == 2); + CompletableFuture<PhoenixMonitoredPreparedStatement> statement1 = futures.get(0); + CompletableFuture<PhoenixMonitoredPreparedStatement> statement2 = futures.get(1); + + //Ensure one statement is successful before returning + ParallelPhoenixUtil.INSTANCE.runFutures(futures, context, true); Review Comment: Let's say that cluster1 (which is active)'s prepared statement passes and cluster2's prepared statement fails, but then when actually running the statement, cluster1's query fails, but cluster 2 is now healthy but has a broken statement. What happens? ########## phoenix-core/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java: ########## @@ -0,0 +1,646 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.phoenix.exception.FailoverSQLException; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +/** + * An implementation of JDBC connection which supports failover between two cluster in an HA group. + * <p> + * During its lifetime, a failover Phoenix connection could possibly connect to two HBase clusters + * in an HA group mutually exclusively. It wraps and delegates the logic to a PhoenixConnection + * object. At any given time, the wrapped connection should only talk to the ACTIVE HBase cluster + * in the HA group. + * <p> + * A failover connection will behave according to the given failover policy upon cluster role + * failover, especially when the current connected HBase cluster becomes STANDBY role from ACTIVE. + * The default behavior (aka default failover policy) will simply close the current connection and + * throw {@link org.apache.phoenix.exception.FailoverSQLException} exception to those clients who + * still use this connection after closing. + * <p> + * This class is not thread safe. + * + * @see HighAvailabilityGroup + * @see FailoverPolicy + */ +public class FailoverPhoenixConnection implements PhoenixMonitoredConnection { + /** + * Failover timeout interval after which failover operation will fail and clients can retry. + */ + public static final String FAILOVER_TIMEOUT_MS_ATTR = "phoenix.ha.failover.timeout.ms"; + public static final long FAILOVER_TIMEOUT_MS_DEFAULT = 10_000; + private static final Logger LOG = LoggerFactory.getLogger(FailoverPhoenixConnection.class); + /** + * Connection properties. + */ + private final Properties properties; + /** + * High availability group. + */ + private final HighAvailabilityGroup haGroup; + /** + * Failover policy, per connection. + */ + private final FailoverPolicy policy; + + /** + * True iff this connection has been closed by the client. + */ + private boolean isClosed; + /** + * The wrapped PhoenixConnection object which could be re-assigned upon failover operation. + */ + private PhoenixConnection connection; + + /** + * Mutation metrics before failover to current connection. + */ + private Map<String, Map<MetricType, Long>> previousMutationMetrics = new HashMap<>(); + /** + * Read metrics before failover to current connection. + */ + private Map<String, Map<MetricType, Long>> previousReadMetrics = new HashMap<>(); + + public FailoverPhoenixConnection(HighAvailabilityGroup haGroup, Properties properties) + throws SQLException { + this.properties = properties; + this.haGroup = haGroup; + this.policy = FailoverPolicy.get(properties); + this.isClosed = false; + this.connection = haGroup.connectActive(properties); + } + + /** + * This is used for explicit failover request made by client. + * <p> + * It fails over to the current ACTIVE HBase cluster; if failover happens in between, this could + * possibly target this same cluster again. + * <p> + * TODO: If there are two ACTIVE clusters, then switch to the other ACTIVE cluster. + * + * @param conn if not of FailoverPhoenixConnection type, throw illegal argument exception + * @param timeoutMs timeout in milliseconds to failover to current active cluster + * @throws SQLException if fails to failover + */ + public static void failover(Connection conn, long timeoutMs) throws SQLException { + Preconditions.checkNotNull(conn, "Connection to failover must not be null!"); + FailoverPhoenixConnection failoverConnection = conn.unwrap(FailoverPhoenixConnection.class); + if (failoverConnection == null) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CLASS_NOT_UNWRAPPABLE) + .setMessage("Connection is not a valid FailoverPhoenixConnection object") + .build() + .buildException(); + } + failoverConnection.failover(timeoutMs); + } + + /** + * Helper method to merge two metrics map into one. + * <p> + * Shallow copy the first one, and deep copy the second one. + * An optimization is that, it will return the shallow directly if the deep is empty. + */ + private static Map<String, Map<MetricType, Long>> mergeMetricMaps( + Map<String, Map<MetricType, Long>> shallow, Map<String, Map<MetricType, Long>> deep) { + if (deep.isEmpty()) { + return shallow; + } + + Map<String, Map<MetricType, Long>> metrics = new HashMap<>(shallow); + deep.forEach((k, v) -> { + metrics.putIfAbsent(k, new HashMap<>()); + Map<MetricType, Long> map = metrics.get(k); + v.forEach((kk, vv) -> { + Long value = map.getOrDefault(kk, 0L); + map.put(kk, value + vv); + }); + }); + return metrics; + } + + /** + * Failover this connection by switching underlying phoenix connection to the ACTIVE one. + * <p> + * If the current phoenix connection is already connecting to ACTIVE cluster, this is a no-op. + * + * @param timeoutMs timeout in ms waiting for a new connection to be established. + * @throws SQLException if fails to failover + */ + @VisibleForTesting + void failover(long timeoutMs) throws SQLException { + checkConnection(); + + if (haGroup.isActive(connection)) { + LOG.info("Connection {} is against ACTIVE cluster in HA group {}; skip failing over.", + connection.getURL(), haGroup.getGroupInfo().getName()); + return; + } + + PhoenixConnection newConn = null; + SQLException cause = null; + final long startTime = EnvironmentEdgeManager.currentTimeMillis(); + while (newConn == null && + EnvironmentEdgeManager.currentTimeMillis() < startTime + timeoutMs) { + try { + newConn = haGroup.connectActive(properties); + } catch (SQLException e) { + cause = e; + LOG.info("Got exception when trying to connect to active cluster.", e); + try { + Thread.sleep(100); // TODO: be smart than this + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new SQLException("Got interrupted waiting for connection failover", e); + } + } + } + if (newConn == null) { + throw new FailoverSQLException("Can not failover connection", + haGroup.getGroupInfo().toString(), cause); + } + + final PhoenixConnection oldConn = connection; + connection = newConn; + if (oldConn != null) { + // aggregate metrics + previousMutationMetrics = oldConn.getMutationMetrics(); + previousReadMetrics = oldConn.getReadMetrics(); + oldConn.clearMetrics(); + + // close old connection + if (!oldConn.isClosed()) { + // TODO: what happens to in-flight edits/mutations? + // Can we copy into the new connection we do not allow this failover? + // MutationState state = oldConn.getMutationState(); Review Comment: We'd want to look _really_ carefully at the thread-safe implications of taking the uncommitted mutation state out of a connection and putting it into another one. Definitely seems good to defer this TODO for now. ########## phoenix-core/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java: ########## @@ -0,0 +1,646 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.phoenix.exception.FailoverSQLException; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +/** + * An implementation of JDBC connection which supports failover between two cluster in an HA group. + * <p> + * During its lifetime, a failover Phoenix connection could possibly connect to two HBase clusters + * in an HA group mutually exclusively. It wraps and delegates the logic to a PhoenixConnection + * object. At any given time, the wrapped connection should only talk to the ACTIVE HBase cluster + * in the HA group. + * <p> + * A failover connection will behave according to the given failover policy upon cluster role + * failover, especially when the current connected HBase cluster becomes STANDBY role from ACTIVE. + * The default behavior (aka default failover policy) will simply close the current connection and + * throw {@link org.apache.phoenix.exception.FailoverSQLException} exception to those clients who + * still use this connection after closing. + * <p> + * This class is not thread safe. + * + * @see HighAvailabilityGroup + * @see FailoverPolicy + */ +public class FailoverPhoenixConnection implements PhoenixMonitoredConnection { + /** + * Failover timeout interval after which failover operation will fail and clients can retry. + */ + public static final String FAILOVER_TIMEOUT_MS_ATTR = "phoenix.ha.failover.timeout.ms"; + public static final long FAILOVER_TIMEOUT_MS_DEFAULT = 10_000; + private static final Logger LOG = LoggerFactory.getLogger(FailoverPhoenixConnection.class); + /** + * Connection properties. + */ + private final Properties properties; + /** + * High availability group. + */ + private final HighAvailabilityGroup haGroup; + /** + * Failover policy, per connection. + */ + private final FailoverPolicy policy; + + /** + * True iff this connection has been closed by the client. + */ + private boolean isClosed; + /** + * The wrapped PhoenixConnection object which could be re-assigned upon failover operation. + */ + private PhoenixConnection connection; + + /** + * Mutation metrics before failover to current connection. + */ + private Map<String, Map<MetricType, Long>> previousMutationMetrics = new HashMap<>(); + /** + * Read metrics before failover to current connection. + */ + private Map<String, Map<MetricType, Long>> previousReadMetrics = new HashMap<>(); + + public FailoverPhoenixConnection(HighAvailabilityGroup haGroup, Properties properties) + throws SQLException { + this.properties = properties; + this.haGroup = haGroup; + this.policy = FailoverPolicy.get(properties); + this.isClosed = false; + this.connection = haGroup.connectActive(properties); + } + + /** + * This is used for explicit failover request made by client. + * <p> + * It fails over to the current ACTIVE HBase cluster; if failover happens in between, this could + * possibly target this same cluster again. + * <p> + * TODO: If there are two ACTIVE clusters, then switch to the other ACTIVE cluster. + * + * @param conn if not of FailoverPhoenixConnection type, throw illegal argument exception + * @param timeoutMs timeout in milliseconds to failover to current active cluster + * @throws SQLException if fails to failover + */ + public static void failover(Connection conn, long timeoutMs) throws SQLException { + Preconditions.checkNotNull(conn, "Connection to failover must not be null!"); + FailoverPhoenixConnection failoverConnection = conn.unwrap(FailoverPhoenixConnection.class); + if (failoverConnection == null) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CLASS_NOT_UNWRAPPABLE) + .setMessage("Connection is not a valid FailoverPhoenixConnection object") + .build() + .buildException(); + } + failoverConnection.failover(timeoutMs); + } + + /** + * Helper method to merge two metrics map into one. + * <p> + * Shallow copy the first one, and deep copy the second one. + * An optimization is that, it will return the shallow directly if the deep is empty. + */ + private static Map<String, Map<MetricType, Long>> mergeMetricMaps( + Map<String, Map<MetricType, Long>> shallow, Map<String, Map<MetricType, Long>> deep) { + if (deep.isEmpty()) { + return shallow; + } + + Map<String, Map<MetricType, Long>> metrics = new HashMap<>(shallow); + deep.forEach((k, v) -> { + metrics.putIfAbsent(k, new HashMap<>()); + Map<MetricType, Long> map = metrics.get(k); + v.forEach((kk, vv) -> { + Long value = map.getOrDefault(kk, 0L); + map.put(kk, value + vv); + }); + }); + return metrics; + } + + /** + * Failover this connection by switching underlying phoenix connection to the ACTIVE one. + * <p> + * If the current phoenix connection is already connecting to ACTIVE cluster, this is a no-op. + * + * @param timeoutMs timeout in ms waiting for a new connection to be established. + * @throws SQLException if fails to failover + */ + @VisibleForTesting + void failover(long timeoutMs) throws SQLException { + checkConnection(); + + if (haGroup.isActive(connection)) { + LOG.info("Connection {} is against ACTIVE cluster in HA group {}; skip failing over.", + connection.getURL(), haGroup.getGroupInfo().getName()); + return; + } + + PhoenixConnection newConn = null; + SQLException cause = null; + final long startTime = EnvironmentEdgeManager.currentTimeMillis(); + while (newConn == null && + EnvironmentEdgeManager.currentTimeMillis() < startTime + timeoutMs) { + try { + newConn = haGroup.connectActive(properties); + } catch (SQLException e) { + cause = e; + LOG.info("Got exception when trying to connect to active cluster.", e); + try { + Thread.sleep(100); // TODO: be smart than this + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new SQLException("Got interrupted waiting for connection failover", e); + } + } + } + if (newConn == null) { + throw new FailoverSQLException("Can not failover connection", + haGroup.getGroupInfo().toString(), cause); + } + + final PhoenixConnection oldConn = connection; + connection = newConn; + if (oldConn != null) { + // aggregate metrics + previousMutationMetrics = oldConn.getMutationMetrics(); + previousReadMetrics = oldConn.getReadMetrics(); + oldConn.clearMetrics(); + + // close old connection + if (!oldConn.isClosed()) { + // TODO: what happens to in-flight edits/mutations? + // Can we copy into the new connection we do not allow this failover? + // MutationState state = oldConn.getMutationState(); + try { + oldConn.close(new SQLExceptionInfo + .Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER) + .setMessage("Phoenix connection got closed due to failover") + .setHaGroupInfo(haGroup.getGroupInfo().toString()) + .build() + .buildException()); + } catch (SQLException e) { + LOG.error("Failed to close old connection after failover: {}", e.getMessage()); + LOG.info("Full stack when closing old connection after failover", e); + } + } + } + LOG.info("Connection {} failed over to {}", haGroup.getGroupInfo(), connection.getURL()); + } + + /** + * Connection can not be null before any operation. + * <p> + * Here when connection is non-null, we do not need to check if the wrapped connection is open. + * The reason is that each individual delegated call on the wrapped connection will internally + * check open itself, see {@link PhoenixConnection#checkOpen()}. + * + * @throws SQLException if current wrapped phoenix connection is not valid state + */ + private void checkConnection() throws SQLException { + if (isClosed) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CONNECTION_CLOSED) + .setHaGroupInfo(haGroup.getGroupInfo().toString()) + .build() + .buildException(); + } + if (connection == null) { + throw new SQLExceptionInfo + .Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION) + .setMessage("Connection has not been established to ACTIVE HBase cluster") + .setHaGroupInfo(haGroup.getGroupInfo().toString()) + .build() + .buildException(); + } + } + + @Override + public void close() throws SQLException { + if (isClosed()) { + return; + } + + try { + connection.close(); + connection.clearMetrics(); + } finally { + previousMutationMetrics.clear(); + previousReadMetrics.clear(); + isClosed = true; + } + } + + @Override + public boolean isClosed() { + return isClosed; + } + + //// metrics for monitoring methods + + @SuppressWarnings("unchecked") + @Override + public <T> T unwrap(Class<T> iface) throws SQLException { + if (!iface.isInstance(this)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CLASS_NOT_UNWRAPPABLE) + .setMessage(getClass().getName() + " not unwrappable from " + iface.getName()) + .build() + .buildException(); + } + return (T) this; + } + + @Override + public Map<String, Map<MetricType, Long>> getMutationMetrics() { + return mergeMetricMaps(connection.getMutationMetrics(), previousMutationMetrics); + } + + @Override + public Map<String, Map<MetricType, Long>> getReadMetrics() { + return mergeMetricMaps(connection.getReadMetrics(), previousReadMetrics); + } + + @Override + public boolean isRequestLevelMetricsEnabled() { + return connection != null && connection.isRequestLevelMetricsEnabled(); + } + + @Override + public void clearMetrics() { + previousMutationMetrics.clear(); + previousReadMetrics.clear(); + if (connection != null) { + connection.clearMetrics(); + } + } + + //// Wrapping phoenix connection operations + + /** + * This is the utility method to help wrapping a method call to phoenix connection. + * + * @param s the supplier which returns a value and may throw SQLException + * @param <T> type of the returned object by the supplier + * @return the object returned by the supplier if any + * @throws SQLException exception when getting object from the supplier + */ + @VisibleForTesting + <T> T wrapActionDuringFailover(SupplierWithSQLException<T> s) throws SQLException { + checkConnection(); + final long timeoutMs = Long.parseLong(properties.getProperty(FAILOVER_TIMEOUT_MS_ATTR, + String.valueOf(FAILOVER_TIMEOUT_MS_DEFAULT))); + int failoverCount = 0; + while (true) { + try { + return s.get(); + } catch (SQLException e) { + if (policy.shouldFailover(e, ++failoverCount)) { + failover(timeoutMs); + } else { + throw new SQLException( + String.format("Error on operation with failover policy %s", policy), e); + } + } + } + } + + @VisibleForTesting + void wrapActionDuringFailover(RunWithSQLException runnable) throws SQLException { + wrapActionDuringFailover(() -> { + runnable.run(); + return null; + }); + } + + @Override + public void commit() throws SQLException { + wrapActionDuringFailover(() -> connection.commit()); Review Comment: Really clever way of wrapping the Connection API...I see now why a Java 7 port wasn't feasible. ########## phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.jdbc; + +import static org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole.ACTIVE; +import static org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole.STANDBY; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.Properties; + +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.monitoring.GlobalClientMetrics; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An HighAvailabilityGroup provides a JDBC connection from given connection string and properties. + */ +enum HighAvailabilityPolicy { + FAILOVER { + @Override + public Connection provide(HighAvailabilityGroup haGroup, Properties info) + throws SQLException { + return new FailoverPhoenixConnection(haGroup, info); + } + @Override + void transitClusterRole(HighAvailabilityGroup haGroup, ClusterRoleRecord oldRecord, + ClusterRoleRecord newRecord) throws SQLException { + if (oldRecord.getRole1() == ACTIVE && newRecord.getRole1() == STANDBY) { Review Comment: Should there be some protection against a newRecord with {STANDBY, STANDBY} (or {ACTIVE, ACTIVE} in a cluster pair in failover mode)) ########## phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixResultSet.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.jdbc; + +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.monitoring.MetricType; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.Function; + +import static org.apache.phoenix.exception.SQLExceptionCode.CLASS_NOT_UNWRAPPABLE; + +public class ParallelPhoenixResultSet extends DelegateResultSet implements PhoenixMonitoredResultSet { Review Comment: What's the distinction between this and the ParallelPhoenixNullComparingResultSet? When would you use one over the other? ########## phoenix-core/src/main/java/org/apache/phoenix/jdbc/ParallelPhoenixContext.java: ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.apache.phoenix.jdbc.ParallelPhoenixUtil.PHOENIX_HA_PARALLEL_OPERATION_TIMEOUT_ATTRIB; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_CONNECTION_CREATED_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_CONNECTION_ERROR_COUNTER; +import static org.apache.phoenix.query.QueryServices.AUTO_COMMIT_ATTRIB; + +/** + * ParallelPhoenixContext holds the state of hte execution of a parallel phoenix operation as well as metrics. Review Comment: tiny nit: typo on "the" ########## phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixHAExecutorServiceProvider.java: ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL1_TASK_END_TO_END_TIME; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL1_TASK_EXECUTED_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL1_TASK_EXECUTION_TIME; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL1_TASK_QUEUE_WAIT_TIME; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL1_TASK_REJECTED_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_END_TO_END_TIME; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_EXECUTED_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_EXECUTION_TIME; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_QUEUE_WAIT_TIME; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_REJECTED_COUNTER; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.phoenix.monitoring.GlobalClientMetrics; +import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList; + +/** + * Provides a bounded and configurable executor service for {@link ParallelPhoenixConnection} and + * related infra. Provides a lazily initialized singleton executor service to be used for all + * ParallelPhoenixConnections Also provides visibility into the capacity of ExecutorService + * {@link PhoenixHAExecutorServiceProvider#hasCapacity(Properties)} + */ +public class PhoenixHAExecutorServiceProvider { + + public static final String HA_MAX_POOL_SIZE = "phoenix.ha.max.pool.size"; + public static final String DEFAULT_HA_MAX_POOL_SIZE = "30"; + public static final String HA_MAX_QUEUE_SIZE = "phoenix.ha.max.queue.size"; + public static final String DEFAULT_HA_MAX_QUEUE_SIZE = "300"; + public static final String HA_THREADPOOL_QUEUE_BACKOFF_THRESHOLD = + "phoenix.ha.threadpool.queue.backoff.threshold"; + public static final String DEFAULT_HA_THREADPOOL_QUEUE_BACKOFF_THRESHOLD = "0.9"; + + private static final Logger LOGGER = + LoggerFactory.getLogger(PhoenixHAExecutorServiceProvider.class); + + // Can make configurable if needed + private static final int KEEP_ALIVE_TIME_SECONDS = 120; + + private static volatile List<ExecutorService> INSTANCE = null; + + private PhoenixHAExecutorServiceProvider() { + } + + public static List<ExecutorService> get(Properties properties) { + if (INSTANCE == null) { + synchronized (PhoenixHAExecutorServiceProvider.class) { + if (INSTANCE == null) { + INSTANCE = initThreadPool(properties); + } + } + } + return INSTANCE; + } + + @VisibleForTesting + static synchronized void resetExecutor() { + INSTANCE = null; + } + + /** + * Checks if the underlying executorServices have sufficient available capacity based on + * {@link #HA_THREADPOOL_QUEUE_BACKOFF_THRESHOLD}. Monitors the capacity of the blockingqueues + * linked with the executor services. + * @param properties + * @return true if queue is less than {@link #HA_THREADPOOL_QUEUE_BACKOFF_THRESHOLD} full + */ + public static List<Boolean> hasCapacity(Properties properties) { + if (INSTANCE == null) { + return ImmutableList.of(Boolean.TRUE, Boolean.TRUE); + } + double backoffThreshold = + Double.parseDouble(properties.getProperty(HA_THREADPOOL_QUEUE_BACKOFF_THRESHOLD, + DEFAULT_HA_THREADPOOL_QUEUE_BACKOFF_THRESHOLD)); + int i = 0; + List<Boolean> executorCapacities = new ArrayList<>(); + for (ExecutorService executor : INSTANCE) { + double queueSize = ((ThreadPoolExecutor) executor).getQueue().size(); + double queueRemainingCapacity = + ((ThreadPoolExecutor) executor).getQueue().remainingCapacity(); + double queueCapacity = queueSize + queueRemainingCapacity; + boolean hasCapacity = ((queueSize / queueCapacity) < backoffThreshold); + if (hasCapacity) { + if (LOGGER.isDebugEnabled()) { + LOGGER.info("PhoenixHAExecutorServiceProvider ThreadPoolExecutor[" + i Review Comment: isDebugEnabled() wrapping an info() call. Also, won't this be very frequent even for a DEBUG? ########## phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java: ########## @@ -0,0 +1,871 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import com.sun.istack.NotNull; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.ZKPaths; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.thirdparty.com.google.common.cache.Cache; +import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder; +import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.phoenix.util.JDBCUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION; + +/** + * An high availability (HA) group is an association between a pair of HBase clusters, a group of + * clients, and an HA policy. + * <p> + * This class is thread safe. Multiple threads may access an instance of this class, including + * multiple clients that call init in order to create a connection, two cluster role managers that + * watches node changes in ZooKeeper. + * <p> + * The lifecycle of an HA group is confined in the global cache, meaning clients can get an instance + * from the cache but cannot construct or close an HA group instance. The reason is that HA group + * is a shared resource by many clients. Closing it intentionally or accidentally by a client will + * impact other connections in this group with unexpected behavior. + */ +@SuppressWarnings("UnstableApiUsage") +public class HighAvailabilityGroup { + public static final String PHOENIX_HA_ATTR_PREFIX = "phoenix.ha."; + public static final String PHOENIX_HA_GROUP_ATTR = PHOENIX_HA_ATTR_PREFIX + "group.name"; + /** + * Should we fall back to single cluster when cluster role record is missing? + */ + public static final String PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_KEY = + PHOENIX_HA_ATTR_PREFIX + "fallback.enabled"; + public static final String PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_DEFAULT = + String.valueOf(Boolean.TRUE); + /** + * The single-cluster connection URL when it needs to fall back. + */ + public static final String PHOENIX_HA_FALLBACK_CLUSTER_KEY = + PHOENIX_HA_ATTR_PREFIX + "fallback.cluster"; + public static final String PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE = + "phoenix" + ZKPaths.PATH_SEPARATOR + "ha"; + + public static final String PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_KEY = + PHOENIX_HA_ATTR_PREFIX + "zk.connection.timeout.ms"; + public static final int PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT = 4_000; + public static final String PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_KEY = + PHOENIX_HA_ATTR_PREFIX + "zk.session.timeout.ms"; + public static final int PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_DEFAULT = 4_000; + public static final String PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_KEY = + PHOENIX_HA_ATTR_PREFIX + "zk.retry.base.sleep.ms"; + + public static final int PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_DEFAULT = 1000; + public static final String PHOENIX_HA_ZK_RETRY_MAX_KEY = + PHOENIX_HA_ATTR_PREFIX + "zk.retry.max"; + public static final int PHOENIX_HA_ZK_RETRY_MAX_DEFAULT = 5; + public static final String PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY = + PHOENIX_HA_ATTR_PREFIX + "zk.retry.max.sleep.ms"; + public static final int PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_DEFAULT = 10_000; + public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry( + PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_DEFAULT, + PHOENIX_HA_ZK_RETRY_MAX_DEFAULT, + PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_DEFAULT); + + public static final String PHOENIX_HA_TRANSITION_TIMEOUT_MS_KEY = + PHOENIX_HA_ATTR_PREFIX + "transition.timeout.ms"; + public static final long PHOENIX_HA_TRANSITION_TIMEOUT_MS_DEFAULT = 5 * 60 * 1000; // 5 mins + + static final Logger LOG = LoggerFactory.getLogger(HighAvailabilityGroup.class); + @VisibleForTesting + static final Map<HAGroupInfo, HighAvailabilityGroup> GROUPS = new ConcurrentHashMap<>(); + @VisibleForTesting + static final Cache<HAGroupInfo, Boolean> MISSING_CRR_GROUPS_CACHE = CacheBuilder.newBuilder() + .expireAfterWrite(PHOENIX_HA_TRANSITION_TIMEOUT_MS_DEFAULT, TimeUnit.MILLISECONDS) + .build(); + /** + * The Curator client cache, one client instance per cluster. + */ + @VisibleForTesting + static final Cache<String, CuratorFramework> CURATOR_CACHE = CacheBuilder.newBuilder() + .expireAfterAccess(DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION, TimeUnit.MILLISECONDS) + .removalListener((notification) -> + ((CuratorFramework) Objects.requireNonNull(notification.getValue())).close()) + .build(); + /** + * High availability group info. + */ + private final HAGroupInfo info; + /** + * Client properties used to initialize this HA group. + */ + private final Properties properties; + /** + * Executor service for the two role managers. + */ + private final ExecutorService roleManagerExecutor = Executors.newFixedThreadPool(2, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("phoenixHAGroup-%d").build()); + /** + * The count down latch to make sure at least one role manager has pulled data from ZK. + */ + private final CountDownLatch roleManagerLatch = new CountDownLatch(1); + /** + * Pair of role managers for watching cluster role records from the two ZK clusters. + */ + private final AtomicReference<PairOfSameType<HAClusterRoleManager>> roleManagers + = new AtomicReference<>(); + /** + * Executor for applying the cluster role to this HA group. + */ + private final ExecutorService nodeChangedExecutor = Executors.newFixedThreadPool(1); + /** + * Current cluster role record for this HA group. + */ + private volatile ClusterRoleRecord roleRecord; + /** + * State of this HA group. + */ + private volatile State state = State.UNINITIALIZED; + + /** + * Private constructor. + * <p> + * To get an instance, please call {@link HighAvailabilityGroup#get(String, Properties)}. + */ + private HighAvailabilityGroup(HAGroupInfo info, Properties properties) { + this.info = info; + this.properties = properties; + } + /** + * This is for test usage only. In production, the record should be retrieved from ZooKeeper. + */ + @VisibleForTesting + HighAvailabilityGroup(HAGroupInfo info, Properties properties, ClusterRoleRecord record, + State state) { + this.info = info; + this.properties = properties; + this.roleRecord = record; + this.state = state; + } + + public static HAGroupInfo getHAGroupInfo(String url, Properties properties) + throws SQLException { + if (url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) { + url = url.substring(PhoenixRuntime.JDBC_PROTOCOL.length() + 1); + } + if (!(url.contains("[") && url.contains("|") && url.contains("]"))) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL) + .setMessage(String.format("URL %s is not a valid HA connection string", url)) + .build() + .buildException(); + } + url = url.substring(url.indexOf("[") + 1, url.indexOf("]")); + String[] urls = url.split("\\|"); + + String name = properties.getProperty(PHOENIX_HA_GROUP_ATTR); + if (StringUtils.isEmpty(name)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.HA_INVALID_PROPERTIES) + .setMessage(String.format("HA group name can not be empty for HA URL %s", url)) + .build() + .buildException(); + } + return new HAGroupInfo(name, urls[0], urls[1]); + } + + /** + * Get an instance of HA group given the HA connecting URL (with "|") and client properties. + * <p> + * The HA group does not have a public constructor. This method is the only public one for + * getting an HA group instance. The reason is that, HA group is considered expensive to create + * and maintain. Caching it will make it reusable for all connection requests to this group. + * <p> + * It will return the cached instance, if any, for the target HA group. The HA group creation + * and initialization are blocking operations. Upon initialization failure, the HA group + * information may be saved in a negative cache iff the cause is due to missing cluster role + * records. In presence of empty (not null or exception) return value, client may choose to fall + * back to a single cluster connection to compensate missing cluster role records. + * + * @return Optional of target HA group (initialized), or empty if missing cluster role records + * @throws SQLException fails to get or initialize an HA group + */ + public static Optional<HighAvailabilityGroup> get(String url, Properties properties) + throws SQLException { + HAGroupInfo info = getHAGroupInfo(url, properties); + if (MISSING_CRR_GROUPS_CACHE.getIfPresent(info) != null) { + return Optional.empty(); + } + + HighAvailabilityGroup haGroup = GROUPS.computeIfAbsent(info, + haGroupInfo -> new HighAvailabilityGroup(haGroupInfo, properties)); + try { + haGroup.init(); + } catch (Exception e) { + GROUPS.remove(info); + haGroup.close(); + try { + CuratorFramework curator1 = CURATOR_CACHE.getIfPresent(info.getUrl1()); + CuratorFramework curator2 = CURATOR_CACHE.getIfPresent(info.getUrl2()); + if (curator1 != null && curator2 != null) { + Stat node1 = curator1.checkExists().forPath(info.getZkPath()); + Stat node2 = curator2.checkExists().forPath(info.getZkPath()); + if (node1 == null && node2 == null) { + // The HA group fails to initialize due to missing cluster role records on + // both ZK clusters. We will put this HA group into negative cache. + MISSING_CRR_GROUPS_CACHE.put(info, true); + return Optional.empty(); + } + } + } catch (Exception e2) { + LOG.error("HA group {} failed to initialized. Got exception when checking if znode" + + " exists on the two ZK clusters.", info, e2); + } + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION) + .setMessage(String.format("Cannot start HA group %s for URL %s", haGroup, url)) + .setRootCause(e) + .build() + .buildException(); + } + return Optional.of(haGroup); + } + + /** + * This method helps client to get the single cluster to fallback. + * <p> + * When getting HA group using {@link #get(String, Properties)}, it may return empty (not null + * or exception) value. In that case client may choose to fall back to a single cluster + * connection to compensate missing cluster role records instead of throw errors. + * + * @param url The HA connection url optionally; empty optional if properties disables fallback + * @param properties The client connection properties + * @return The connection url of the single cluster to fall back + * @throws SQLException if fails to get HA information and/or invalid properties are seen + */ + static Optional<String> getFallbackCluster(String url, Properties properties) throws SQLException { + HAGroupInfo haGroupInfo = getHAGroupInfo(url, properties); + + String fallback = properties.getProperty(PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_KEY, + PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_DEFAULT); + if (!Boolean.parseBoolean(fallback)) { + LOG.info("Fallback to single cluster not enabled for the HA group {} per configuration." + + " HA url: '{}'.", haGroupInfo.getName(), url); + return Optional.empty(); + } + String fallbackCluster = properties.getProperty(PHOENIX_HA_FALLBACK_CLUSTER_KEY); + if (StringUtils.isEmpty(fallbackCluster)) { + fallbackCluster = haGroupInfo.getUrl1(); + } + LOG.info("Falling back to single cluster '{}' for the HA group {} to serve HA connection " + + "request against url '{}'.", + fallbackCluster, haGroupInfo.getName(), url); + return Optional.of(fallbackCluster); + } + + /** + * Get an active curator ZK client for the given properties and ZK endpoint. + * <p> + * This can be from cached object since Curator should be shared per cluster. + * + * @param jdbcUrl the ZK endpoint host:port or the JDBC connection String host:port:/hbase + * @param properties the properties defining time out values and retry count + * @return a new Curator framework client + */ + @SuppressWarnings("UnstableApiUsage") + public static CuratorFramework getCurator(String jdbcUrl, Properties properties) + throws IOException { + try { + return CURATOR_CACHE.get(jdbcUrl, () -> { + CuratorFramework curator = createCurator(jdbcUrl, properties); + if (!curator.blockUntilConnected(PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT, + TimeUnit.MILLISECONDS)) + throw new RuntimeException("Failed to connect to the CuratorFramework in " + + "timeout " + PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT + " ms"); + return curator; + }); + } catch (Exception e) { + LOG.error("Fail to get an active curator for url {}", jdbcUrl, e); + // invalidate the cache when getting/creating throws exception + CURATOR_CACHE.invalidate(jdbcUrl); + throw new IOException(e); + } + } + + /** + * Create a curator ZK client for the given properties and ZK endpoint. + * <p> + * Unless caller needs a new curator, it should use {@link #getCurator(String, Properties)}. + */ + private static CuratorFramework createCurator(String jdbcUrl, Properties properties) { + // Get the ZK endpoint in host:port format by removing JDBC protocol and HBase root node + final String zkUrl; + if (jdbcUrl.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) { + jdbcUrl = jdbcUrl.substring(PhoenixRuntime.JDBC_PROTOCOL.length() + 1); + } + Preconditions.checkArgument(!StringUtils.isEmpty(jdbcUrl), "JDBC url is empty!"); + String[] urls = jdbcUrl.split(":"); + if (urls.length == 1) { + zkUrl = String.format("%s:%s", urls[0], HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT); + } else if (urls.length == 2 || urls.length == 3) { + zkUrl = String.format("%s:%s", urls[0], urls[1]); + } else { + throw new IllegalArgumentException("Invalid JDBC url!" + jdbcUrl); + } + + // Get timeout and retry counts + String connectionTimeoutMsProp = properties.getProperty( + PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_KEY); + final int connectionTimeoutMs = !StringUtils.isEmpty(connectionTimeoutMsProp) + ? Integer.parseInt(connectionTimeoutMsProp) + : PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT; + String sessionTimeoutMsProps = properties.getProperty(PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_KEY); + final int sessionTimeoutMs = !StringUtils.isEmpty(sessionTimeoutMsProps) + ? Integer.parseInt(sessionTimeoutMsProps) + : PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_DEFAULT; + final RetryPolicy retryPolicy = createRetryPolicy(properties); + + CuratorFramework curator = CuratorFrameworkFactory + .builder() + .connectString(zkUrl) + .namespace(PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE) + .connectionTimeoutMs(connectionTimeoutMs) + .sessionTimeoutMs(sessionTimeoutMs) + .retryPolicy(retryPolicy) + .canBeReadOnly(true) + .build(); + curator.start(); + return curator; + } + + /** + * Create a Curator retry policy from properties. + * <p> + * If properties is null, return a default retry policy. + * + * @param properties properties defining timeout and max retries + * @return a retry policy which can be used for Curator operations + */ + public static RetryPolicy createRetryPolicy(Properties properties) { + if (properties == null) { + return RETRY_POLICY; + } + String baseSleepTimeMsProp = properties.getProperty(PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_KEY); + int baseSleepTimeMs = StringUtils.isNotEmpty(baseSleepTimeMsProp) + ? Integer.parseInt(baseSleepTimeMsProp) + : PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_DEFAULT; + String maxRetriesProp = properties.getProperty(PHOENIX_HA_ZK_RETRY_MAX_KEY); + int maxRetries = StringUtils.isNotEmpty(maxRetriesProp) + ? Integer.parseInt(maxRetriesProp) + : PHOENIX_HA_ZK_RETRY_MAX_DEFAULT; + String maxSleepTimeMsProp = properties.getProperty(PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY); + int maxSleepTimeMs = StringUtils.isNotEmpty(maxSleepTimeMsProp) + ? Integer.parseInt(maxSleepTimeMsProp) + : PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_DEFAULT; + return new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries, maxSleepTimeMs); + } + + /** + * Initialize this HA group by registering ZK watchers and getting initial cluster role record. + * <p> + * If this is already initialized, calling this method is a no-op. This method is lock free as + * current thread will either return fast or wait for the in-progress initialization or timeout. + */ + public void init() throws IOException { + if (state != State.UNINITIALIZED) { + return; + } + + PairOfSameType<HAClusterRoleManager> newRoleManagers = new PairOfSameType<>( + new HAClusterRoleManager(info.urls.getFirst(), properties), + new HAClusterRoleManager(info.urls.getSecond(), properties)); + if (!roleManagers.compareAndSet(null, newRoleManagers)) { + LOG.info("Someone already started role managers; waiting for that one..."); + waitForInitialization(properties); + return; + } + + Future<?> f1 = roleManagerExecutor.submit(newRoleManagers.getFirst()); + Future<?> f2 = roleManagerExecutor.submit(newRoleManagers.getSecond()); + try { + waitForInitialization(properties); + } catch (IOException e) { + // HA group that fails to initialize will not be kept in the global cache. + // Next connection request will create and initialize a new HA group. + // Before returning in case of exception, following code will cancel the futures. + f1.cancel(true); + f2.cancel(true); + throw e; + } + + assert roleRecord != null; + LOG.info("Initial cluster role for HA group {} is {}", info, roleRecord); + } + + /** + * Helper method that will block current thread until the HA group is initialized. + * <p> + * After returning, the HA state might not be in READY state. That is possible when a new ZK + * node change is detected triggering HA group to become IN_TRANSIT state. + * + * @param properties the connection properties + * @throws IOException when current HA group is not initialized before timeout + */ + private void waitForInitialization(Properties properties) throws IOException { + String connectionTimeoutMsProp = properties.getProperty( + PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_KEY); + int timeout = !StringUtils.isEmpty(connectionTimeoutMsProp) + ? Integer.parseInt(connectionTimeoutMsProp) + : PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT; + boolean started = false; + try { + started = roleManagerLatch.await(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.warn("Got interrupted when waiting for cluster role managers to start", e); + Thread.currentThread().interrupt(); + } + if (!started) { + LOG.warn("Timed out {}ms waiting for HA group '{}' to be initialized.", timeout, info); + throw new IOException("Fail to initialize HA group " + info); + } + } + + /** + * Create a JDBC connection in this high availability group. + * + * @param properties connection properties + * @return a JDBC connection implementation + * @throws SQLException if fails to connect a JDBC connection + */ + public Connection connect(Properties properties) throws SQLException { + if (state != State.READY) { + throw new SQLExceptionInfo + .Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION) + .setMessage("HA group is not ready!") + .setHaGroupInfo(info.toString()) + .build() + .buildException(); + } + return roleRecord.getPolicy().provide(this, properties); + } + + /** + * Get a Phoenix connection against the current active HBase cluster. + * <p> + * If there is no active cluster, it will throw exception instead of blocking or retrying. + * + * @param properties connection properties + * @return a Phoenix connection to current active HBase cluster + * @throws SQLException if fails to get a connection + */ + PhoenixConnection connectActive(final Properties properties) throws SQLException { + try { + Optional<String> url = roleRecord.getActiveUrl(); + if (state == State.READY && url.isPresent()) { + PhoenixConnection conn = connectToOneCluster(url.get(), properties); + // After connection is created, double check if the cluster is still ACTIVE + // This is to make sure the newly created connection will not be returned to client + // if the target cluster is not active any more. This can happen during failover. + if (state == State.READY && isActive(conn)) { + return conn; + } else { + conn.close(); Review Comment: Is there any way for the conn to leak here? I don't immediately see one, but worth a second look since there's no easy way to put the close() in a finally block. ########## phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java: ########## @@ -0,0 +1,634 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import java.io.Closeable; +import java.io.FileReader; +import java.io.IOException; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.PosixParser; +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.atomic.AtomicValue; +import org.apache.curator.framework.recipes.atomic.DistributedAtomicValue; +import org.apache.curator.utils.ZKPaths; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.apache.phoenix.util.JacksonUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The command line tool to manage high availability (HA) groups and their cluster roles. + */ +public class PhoenixHAAdminTool extends Configured implements Tool { + // Following are return value of this tool. We need this to be very explicit because external + // system calling this tool may need to retry, alert or audit the operations of cluster roles. + public static final int RET_SUCCESS = 0; // Saul Goodman + public static final int RET_ARGUMENT_ERROR = 1; // arguments are invalid + public static final int RET_SYNC_ERROR = 2; // error to sync from manifest to ZK + public static final int RET_REPAIR_FOUND_INCONSISTENCIES = 3; // error to repair current ZK + + private static final Logger LOG = LoggerFactory.getLogger(PhoenixHAAdminTool.class); + + private static final Option HELP_OPT = new Option("h", "help", false, "Show this help"); + private static final Option FORCEFUL_OPT = + new Option("F", "forceful", false, + "Forceful writing cluster role records ignoring errors on other clusters"); + private static final Option MANIFEST_OPT = + new Option("m", "manifest", true, "Manifest file containing cluster role records"); + private static final Option LIST_OPT = + new Option("l", "list", false, "List all HA groups stored on this ZK cluster"); + private static final Option REPAIR_OPT = new Option("r", "repair", false, + "Verify all HA groups stored on this ZK cluster and repair if inconsistency found"); + @VisibleForTesting + static final Options OPTIONS = new Options() + .addOption(HELP_OPT) + .addOption(FORCEFUL_OPT) + .addOption(MANIFEST_OPT) + .addOption(LIST_OPT) + .addOption(REPAIR_OPT); + + @Override + public int run(String[] args) throws Exception { + CommandLine commandLine; + try { + commandLine = parseOptions(args); + } catch (Exception e) { + System.err.println( + "ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: " + e); + printUsageMessage(); + return RET_ARGUMENT_ERROR; + } + + if (commandLine.hasOption(HELP_OPT.getOpt())) { + printUsageMessage(); + return RET_SUCCESS; + } else if (commandLine.hasOption(LIST_OPT.getOpt())) { // list + String zkUrl = getLocalZkUrl(getConf()); // Admin is created against local ZK cluster + try (PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(zkUrl, getConf(), HighAvailibilityCuratorProvider.INSTANCE)) { + List<ClusterRoleRecord> records = admin.listAllClusterRoleRecordsOnZookeeper(); + JacksonUtil.getObjectWriterPretty().writeValue(System.out, records); + } + } else if (commandLine.hasOption(MANIFEST_OPT.getOpt())) { // create or update + String fileName = commandLine.getOptionValue(MANIFEST_OPT.getOpt()); + List<ClusterRoleRecord> records = readRecordsFromFile(fileName); + boolean forceful = commandLine.hasOption(FORCEFUL_OPT.getOpt()); + Map<String, List<String>> failedHaGroups = syncClusterRoleRecords(records, forceful); + if (!failedHaGroups.isEmpty()) { + System.out.println("Found following HA groups are failing to write the clusters:"); + failedHaGroups.forEach((k, v) -> + System.out.printf("%s -> [%s]\n", k, String.join(",", v))); + return RET_SYNC_ERROR; + } + } else if (commandLine.hasOption(REPAIR_OPT.getOpt())) { // verify and repair + String zkUrl = getLocalZkUrl(getConf()); // Admin is created against local ZK cluster + try (PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(zkUrl, getConf(), HighAvailibilityCuratorProvider.INSTANCE)) { + List<String> inconsistentRecord = admin.verifyAndRepairWithRemoteZnode(); + if (!inconsistentRecord.isEmpty()) { + System.out.println("Found following inconsistent cluster role records: "); + System.out.print(String.join(",", inconsistentRecord)); + return RET_REPAIR_FOUND_INCONSISTENCIES; + } + } + } + return RET_SUCCESS; + } + + /** + * Read cluster role records defined in the file, given file name. + * + * @param file The local manifest file name to read from + * @return list of cluster role records defined in the manifest file + * @throws Exception when parsing or reading from the input file + */ + @VisibleForTesting + List<ClusterRoleRecord> readRecordsFromFile(String file) throws Exception { + Preconditions.checkArgument(!StringUtils.isEmpty(file)); + String fileType = FilenameUtils.getExtension(file); + switch (fileType) { + case "json": + // TODO: use jackson or standard JSON library according to PHOENIX-5789 + try (Reader reader = new FileReader(file)) { + ClusterRoleRecord[] records = + JacksonUtil.getObjectReader(ClusterRoleRecord[].class).readValue(reader); + return Arrays.asList(records); + } + case "yaml": + LOG.error("YAML file is not yet supported. See W-8274533"); + default: + throw new Exception("Can not read cluster role records from file '" + file + "' " + + "reason: unsupported file type"); + } + } + + /** + * Helper method to write the given cluster role records into the ZK clusters respectively. + * + * // TODO: add retry logics + * + * @param records The cluster role record list to save on ZK + * @param forceful if true, this method will ignore errors on other clusters; otherwise it will + * not update next cluster (in order) if there is any failure on current cluster + * @return a map of HA group name to list cluster's url for cluster role record failing to write + */ + private Map<String, List<String>> syncClusterRoleRecords(List<ClusterRoleRecord> records, + boolean forceful) throws IOException { + Map<String, List<String>> failedHaGroups = new HashMap<>(); + for (ClusterRoleRecord record : records) { + String haGroupName = record.getHaGroupName(); + try (PhoenixHAAdminHelper admin1 = new PhoenixHAAdminHelper(record.getZk1(), getConf(), HighAvailibilityCuratorProvider.INSTANCE); + PhoenixHAAdminHelper admin2 = new PhoenixHAAdminHelper(record.getZk2(), getConf(), HighAvailibilityCuratorProvider.INSTANCE)) { + // Update the cluster previously ACTIVE cluster first. + // It reduces the chances of split-brain between clients and clusters. + // If can not determine previous ACTIVE cluster, update new STANDBY cluster first. + final PairOfSameType<PhoenixHAAdminHelper> pair; + if (admin1.isCurrentActiveCluster(haGroupName)) { + pair = new PairOfSameType<>(admin1, admin2); + } else if (admin2.isCurrentActiveCluster(haGroupName)) { + pair = new PairOfSameType<>(admin2, admin1); + } else if (record.getRole(admin1.getZkUrl()) == ClusterRole.STANDBY) { + pair = new PairOfSameType<>(admin1, admin2); + } else { + pair = new PairOfSameType<>(admin2, admin1); Review Comment: how would we ever get here? (if neither 1 nor 2 are ACTIVE, and 1 isn't STANDBY?) ########## phoenix-core/src/test/java/org/apache/phoenix/jdbc/ParallelPhoenixResultSetTest.java: ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.jdbc; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; + +public class ParallelPhoenixResultSetTest { + CompletableFuture<ResultSet> completableRs1; + CompletableFuture<ResultSet> completableRs2; + + ParallelPhoenixResultSet resultSet; + + @Before + public void init() { + completableRs1 = Mockito.mock(CompletableFuture.class); + completableRs2 = Mockito.mock(CompletableFuture.class); + resultSet = + new ParallelPhoenixResultSet( + new ParallelPhoenixContext(new Properties(), null, + HighAvailabilityTestingUtility + .getListOfSingleThreadExecutorServices(), + null), + completableRs1, completableRs2); + } + + @Test + public void testUnbound() throws SQLException { + ResultSet rs = resultSet.getResultSet(); + assertNull(rs); + } + + @Test + public void testNextBound() throws SQLException { + ResultSet rs = Mockito.mock(ResultSet.class); + resultSet.setResultSet(rs); + resultSet.next(); + Mockito.verify(rs).next(); + Mockito.verifyNoMoreInteractions(rs); + } + + @Test + public void testRS1WinsNext() throws Exception { + + ResultSet rs1 = Mockito.mock(ResultSet.class); + ResultSet rs2 = Mockito.mock(ResultSet.class); + + Executor rsExecutor2 = Mockito.mock(Executor.class); + + CountDownLatch latch = new CountDownLatch(1); + + //inject a sleep + doAnswer( + (InvocationOnMock invocation) -> { + Thread thread = new Thread(() -> { + try { + //TODO: Remove this sleep + latch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(); + } + ((Runnable) invocation.getArguments()[0]).run(); + return; + }); + thread.start(); + return null; + } + ).when(rsExecutor2).execute(any(Runnable.class)); + + completableRs1 = CompletableFuture.completedFuture(rs1); + + completableRs2 = CompletableFuture.supplyAsync(() -> rs2, rsExecutor2); + + resultSet = + new ParallelPhoenixResultSet( + new ParallelPhoenixContext(new Properties(), null, + HighAvailabilityTestingUtility + .getListOfSingleThreadExecutorServices(), + null), + completableRs1, completableRs2); + + resultSet.next(); + + assertEquals(rs1, resultSet.getResultSet()); + } + + @Test + public void testRS2WinsNext() throws Exception { + ResultSet rs1 = Mockito.mock(ResultSet.class); + ResultSet rs2 = Mockito.mock(ResultSet.class); + + Executor rsExecutor1 = Mockito.mock(Executor.class); + CountDownLatch latch = new CountDownLatch(1); + //inject a sleep + doAnswer( + (InvocationOnMock invocation) -> { + Thread thread = new Thread(() -> { + try { + latch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(); + } + ((Runnable) invocation.getArguments()[0]).run(); + return; + }); + thread.start(); + return null; + } + ).when(rsExecutor1).execute(any(Runnable.class)); + + completableRs1 = CompletableFuture.supplyAsync(() -> rs1, rsExecutor1); + completableRs2 = CompletableFuture.completedFuture(rs2); + + resultSet = + new ParallelPhoenixResultSet( + new ParallelPhoenixContext(new Properties(), null, + HighAvailabilityTestingUtility + .getListOfSingleThreadExecutorServices(), + null), + completableRs1, completableRs2); + + resultSet.next(); + + assertEquals(rs2, resultSet.getResultSet()); + } + + @Test + public void testRS1FailsImmediatelyNext() throws Exception { + ResultSet rs2 = Mockito.mock(ResultSet.class); + Executor rsExecutor2 = Mockito.mock(Executor.class); + CountDownLatch latch = new CountDownLatch(1); + //inject a sleep + doAnswer( + (InvocationOnMock invocation) -> { + Thread thread = new Thread(() -> { + try { + latch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(); + } + ((Runnable) invocation.getArguments()[0]).run(); + return; + }); + thread.start(); + return null; + } + ).when(rsExecutor2).execute(any(Runnable.class)); + + //CompletableFuture.failedFuture(new RuntimeException("Failure")); Review Comment: nit: commented code can be removed ########## phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolTest.java: ########## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.apache.phoenix.jdbc.PhoenixHAAdminTool.RET_SUCCESS; +import static org.apache.phoenix.jdbc.PhoenixHAAdminTool.getLocalZkUrl; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.contains; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.List; + +import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.GetDataBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.util.ToolRunner; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unit test for {@link PhoenixHAAdminTool} including the helper class {@link PhoenixHAAdminHelper}. + * + * @see PhoenixHAAdminToolIT + */ +public class PhoenixHAAdminToolTest { + private static final Logger LOG = LoggerFactory.getLogger(PhoenixHAAdminToolTest.class); + private static final String ZK1 = "zk1:2181:/hbase"; + private static final String ZK2 = "zk2:2181:/hbase"; + private static final PrintStream STDOUT = System.out; + private static final ByteArrayOutputStream STDOUT_CAPTURE = new ByteArrayOutputStream(); + + private final PhoenixHAAdminTool.HighAvailibilityCuratorProvider mockHighAvailibilityCuratorProvider = Mockito.mock(PhoenixHAAdminTool.HighAvailibilityCuratorProvider.class); + + /** Use mocked curator since there is no mini-ZK cluster. */ + private final CuratorFramework curator = Mockito.mock(CuratorFramework.class); + /** HA admin to test for one test case. */ + private final PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(ZK1, new Configuration(), mockHighAvailibilityCuratorProvider); + + private String haGroupName; + private ClusterRoleRecord recordV1; + + @Rule + public final TestName testName = new TestName(); + + @Before + public void setup() throws Exception { + when(mockHighAvailibilityCuratorProvider.getCurator(Mockito.anyString(),Mockito.anyObject())).thenReturn(curator); + haGroupName = testName.getMethodName(); + recordV1 = new ClusterRoleRecord( + haGroupName, HighAvailabilityPolicy.FAILOVER, + ZK1, ClusterRole.ACTIVE, + ZK2, ClusterRole.STANDBY, + 1); + saveRecordV1ToZk(); + } + + @After + public void after() { + // reset STDOUT in case it was captured for testing + System.setOut(STDOUT); + } + + /** + * Test command line options. + * + * In the test body, we split sections by {} to make sure no variable is reused in mistakenly. + */ + @Test + public void testCommandLineOption() throws Exception { + { // no value for -m option + String[] args = { "-m" }; + int ret = ToolRunner.run(new PhoenixHAAdminTool(), args); + assertEquals(PhoenixHAAdminTool.RET_ARGUMENT_ERROR, ret); + } + { // -l does not work with -m option + String[] args = { "-l", "-m", "cluster-role-records.yaml" }; + int ret = ToolRunner.run(new PhoenixHAAdminTool(), args); + assertEquals(PhoenixHAAdminTool.RET_ARGUMENT_ERROR, ret); + } + { // -l does not work with -F/--forceful option + String[] args = { "-l", "-F"}; + int ret = ToolRunner.run(new PhoenixHAAdminTool(), args); + assertEquals(PhoenixHAAdminTool.RET_ARGUMENT_ERROR, ret); + } + { // -l does not work with --repair option + String[] args = { "-l", "-r"}; + int ret = ToolRunner.run(new PhoenixHAAdminTool(), args); + assertEquals(PhoenixHAAdminTool.RET_ARGUMENT_ERROR, ret); + } + { // -m does not work with --repair option + String[] args = { "-m", "cluster-role-records.yaml", "-r"}; + int ret = ToolRunner.run(new PhoenixHAAdminTool(), args); + assertEquals(PhoenixHAAdminTool.RET_ARGUMENT_ERROR, ret); + } + } + + /** + * Test that helper method works for reading cluster role records from JSON file. + */ + @Test + public void testReadRecordsFromFileJson() throws Exception { + { // one record in JSON file + String fileName = ClusterRoleRecordTest.createJsonFileWithRecords(recordV1); + List<ClusterRoleRecord> records = new PhoenixHAAdminTool().readRecordsFromFile(fileName); + assertEquals(1, records.size()); + assertTrue(records.contains(recordV1)); + } + + { // two records in JSON file + String haGroupName2 = haGroupName + RandomStringUtils.randomAlphabetic(3); + ClusterRoleRecord record2 = new ClusterRoleRecord( + haGroupName2, HighAvailabilityPolicy.FAILOVER, + ZK1, ClusterRole.ACTIVE, + ZK2, ClusterRole.STANDBY, + 1); + String fileName = ClusterRoleRecordTest.createJsonFileWithRecords(recordV1, record2); + List<ClusterRoleRecord> records = new PhoenixHAAdminTool().readRecordsFromFile(fileName); + assertEquals(2, records.size()); + assertTrue(records.contains(recordV1)); + assertTrue(records.contains(record2)); + } + } + + /** + * Test that agent will try to crate znode if it does not exist. Review Comment: tiny nit: "create znode" ########## phoenix-core/src/test/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionTest.java: ########## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.phoenix.exception.FailoverSQLException; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.HighAvailabilityGroup.HAGroupInfo; + +/** + * Unit test for {@link FailoverPhoenixConnection}. + * + * @see FailoverPhoenixConnectionIT + */ +public class FailoverPhoenixConnectionTest { + private static final Logger LOG = LoggerFactory.getLogger(FailoverPhoenixConnectionTest.class); + + @Mock PhoenixConnection connection1; + @Mock PhoenixConnection connection2; + @Mock HighAvailabilityGroup haGroup; + + final HAGroupInfo haGroupInfo = new HAGroupInfo("fake", "zk1", "zk2"); + FailoverPhoenixConnection failoverConnection; // this connection itself is not mocked or spied. + + @Before + public void init() throws SQLException { + MockitoAnnotations.initMocks(this); + when(haGroup.getGroupInfo()).thenReturn(haGroupInfo); + when(haGroup.connectActive(any(Properties.class))).thenReturn(connection1); + + failoverConnection = new FailoverPhoenixConnection(haGroup, new Properties()); + } + + /** + * Test helper method {@link FailoverPhoenixConnection#wrapActionDuringFailover}. + */ + @Test + public void testWrapActionDuringFailover() throws SQLException { + // Test SupplierWithSQLException which returns a value + String str = "Hello, World!"; + assertEquals(str, failoverConnection.wrapActionDuringFailover(() -> str)); + + // Test RunWithSQLException which does not return value + final AtomicInteger counter = new AtomicInteger(0); + failoverConnection.wrapActionDuringFailover(counter::incrementAndGet); + assertEquals(1, counter.get()); + } + + /** + * Test that after calling failover(), the old connection got closed with FailoverSQLException, + * and a new Phoenix connection is opened. + */ + @Test + public void testFailover() throws SQLException { + // Make HAGroup return a different phoenix connection when it gets called next time + when(haGroup.connectActive(any(Properties.class))).thenReturn(connection2); + + // explicit call failover + failoverConnection.failover(1000L); + + // The old connection should have been closed due to failover + verify(connection1, times(1)).close(any(FailoverSQLException.class)); + // A new Phoenix connection is wrapped underneath + assertEquals(connection2, failoverConnection.getWrappedConnection()); + } + + /** + * Test static {@link FailoverPhoenixConnection#failover(Connection, long)} method. + */ + @Test + public void testFailoverStatic() throws SQLException { + try { + FailoverPhoenixConnection.failover(connection1, 1000L); + fail("Should have failed since plain phoenix connection can not failover!"); + } catch (SQLException e) { + LOG.info("Got expected exception when trying to failover on non-HA connection", e); Review Comment: nit: can we match the SQLExceptionCode to make sure it's the right error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
