This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit e8bf61ad4005ccd0aefbe234a38959897ce535c5 Author: Daniel Wong <daniel.w...@salesforce.com> AuthorDate: Mon Apr 27 03:38:54 2020 -0700 PHOENIX-5872 Close Internal Phoenix Connections that were running during cancel Signed-off-by: Xinyi Yan <yanxi...@apache.org> --- phoenix-core/pom.xml | 4 + .../phoenix/query/MaxConcurrentConnectionsIT.java | 132 +++++++++++++++++++++ .../apache/phoenix/util/DelayedRegionServer.java | 124 +++++++++++++++++++ .../compile/MutatingParallelIteratorFactory.java | 1 + .../apache/phoenix/exception/SQLExceptionCode.java | 3 + .../org/apache/phoenix/jdbc/PhoenixConnection.java | 59 +++++++-- .../phoenix/monitoring/GlobalClientMetrics.java | 2 + .../org/apache/phoenix/monitoring/MetricType.java | 1 + .../phoenix/query/ConnectionQueryServicesImpl.java | 57 ++++++--- .../org/apache/phoenix/query/QueryServices.java | 3 + .../apache/phoenix/query/QueryServicesOptions.java | 2 + 11 files changed, 365 insertions(+), 23 deletions(-) diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml index 33a8013..7b91753 100644 --- a/phoenix-core/pom.xml +++ b/phoenix-core/pom.xml @@ -318,6 +318,10 @@ <groupId>org.apache.hbase.thirdparty</groupId> <artifactId>hbase-shaded-miscellaneous</artifactId> </dependency> + <dependency> + <groupId>org.apache.hbase.thirdparty</groupId> + <artifactId>hbase-shaded-protobuf</artifactId> + </dependency> <!-- HBase test dependencies --> <dependency> diff --git a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java new file mode 100644 index 0000000..7da276c --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.query; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.util.DelayedRegionServer; +import org.apache.phoenix.util.PhoenixRuntime; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static org.apache.phoenix.exception.SQLExceptionCode.NEW_CONNECTION_THROTTLED; +import static org.apache.phoenix.exception.SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS; +import static org.apache.phoenix.query.QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS; +import static org.apache.phoenix.query.QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS; +import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_ENABLED; +import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB; +import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Note that some tests for concurrentConnections live in PhoenixMetricsIT.java which also test the metric emission + */ +public class MaxConcurrentConnectionsIT extends BaseUniqueNamesOwnClusterIT { + + private static HBaseTestingUtility hbaseTestUtil; + + @BeforeClass + public static void setUp() throws Exception { + hbaseTestUtil = new HBaseTestingUtility(); + + hbaseTestUtil.startMiniCluster(1,1,null,null,DelayedRegionServer.class); + // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver + String zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort(); + url = PhoenixRuntime.JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkQuorum + + JDBC_PROTOCOL_SEPARATOR + "uniqueConn=A"; + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + private String getUniqueUrl() { + return url + generateUniqueName(); + } + + //Have to shutdown our special delayed region server + @AfterClass + public static void tearDown() throws Exception { + hbaseTestUtil.shutdownMiniCluster(); + } + + /** + * This tests the delete path which creates a internal phoenix connection per region + * @throws Exception + */ + @Test + public void testDeleteRuntimeFailureClosesConnections() throws Exception { + String tableName = generateUniqueName(); + String connectionUrl = getUniqueUrl(); + //table with lots of regions + String ddl = "create table " + tableName + " (i integer not null primary key, j integer) SALT_BUCKETS=256 "; + + Properties props = new Properties(); + props.setProperty(CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS,String.valueOf(10)); + props.setProperty(INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS,String.valueOf(10)); + + //delay any task handeling as that causes additional connections + props.setProperty(TASK_HANDLING_INTERVAL_MS_ATTRIB,String.valueOf(600000)); + props.setProperty(TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,String.valueOf(600000)); + + String deleteStmt = "DELETE FROM " + tableName + " WHERE 20 = j"; + + try(Connection conn = DriverManager.getConnection(connectionUrl, props); Statement statement = conn.createStatement()) { + statement.execute(ddl); + } + + assertEquals(0, GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue()); + assertEquals(0, GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.getMetric().getValue()); + Connection conn = null; + try { + conn = DriverManager.getConnection(connectionUrl, props); + //Enable delay for the delete + DelayedRegionServer.setDelayEnabled(true); + try (Statement statement = conn.createStatement()) { + statement.execute(deleteStmt); + } + fail(); + } catch (SQLException e) { + assertEquals(NEW_INTERNAL_CONNECTION_THROTTLED.getErrorCode(), e.getErrorCode()); + assertEquals(NEW_INTERNAL_CONNECTION_THROTTLED.getSQLState(), e.getSQLState()); + } finally { + DelayedRegionServer.setDelayEnabled(false); + if (conn != null) { + conn.close(); + } + long connections = GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue(); + assertEquals(String.format("Found %d connections still open.", connections),0,connections); + connections = GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.getMetric().getValue(); + assertEquals(String.format("Found %d internal connections still open.", connections),0 ,connections); + } + + } +} \ No newline at end of file diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java b/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java new file mode 100644 index 0000000..95bf3f4 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * This is a extended MiniHbaseCluster Region Server whcih allows developer/tester to inject + * delay into specific server side operations for testing. + */ +public class DelayedRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer { + + private static final Logger LOGGER = LoggerFactory.getLogger(DelayedRegionServer.class); + + static boolean doDelay = false; + // Activate the delays after table creation to test get/scan/put + private static int DELAY_GET = 0; + private static int DELAY_SCAN = 30000; + private static int DELAY_MUTATE = 0; + + public static void setDelayEnabled(boolean delay) { + doDelay = delay; + } + + public static void setDelayGet(int delayGet) { + DELAY_GET = delayGet; + } + + public static void setDelayScan(int delayScan) { + DELAY_SCAN = delayScan; + } + + public static void setDelayMutate(int delayMutate) { + DELAY_MUTATE = delayMutate; + } + + public DelayedRegionServer(Configuration conf) + throws IOException, InterruptedException { + super(conf); + } + + @Override protected RSRpcServices createRpcServices() throws IOException { + return new DelayedRSRpcServices(this); + } + + /** + * This class injects delay for Rpc calls and after executes super methods is delay is set. + */ + public static class DelayedRSRpcServices extends RSRpcServices { + + DelayedRSRpcServices(HRegionServer rs) throws IOException { + super(rs); + } + + @Override public GetResponse get(final RpcController controller, + final GetRequest request) throws ServiceException { + try { + if (doDelay) { + Thread.sleep(DELAY_GET); + } + } catch (InterruptedException e) { + LOGGER.error("Sleep interrupted during get operation", e); + } + return super.get(controller, request); + } + + @Override public MutateResponse mutate(final RpcController rpcc, + final MutateRequest request) throws ServiceException { + try { + if (doDelay) { + Thread.sleep(DELAY_MUTATE); + } + } catch (InterruptedException e) { + LOGGER.error("Sleep interrupted during mutate operation", e); + } + return super.mutate(rpcc, request); + } + + @Override public ScanResponse scan(final RpcController controller, + ScanRequest request) throws ServiceException { + try { + if (doDelay) { + Thread.sleep(DELAY_SCAN); + } + } catch (InterruptedException e) { + LOGGER.error("Sleep interrupted during scan operation", e); + } + return super.scan(controller, request); + } + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java index 4aaa72d..7113867 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java @@ -66,6 +66,7 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato QueryPlan plan) throws SQLException { final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection); + connection.addChildConnection(clonedConnection); try { MutationState state = mutate(parentContext, iterator, clonedConnection); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index bb6d629..17157db 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -524,6 +524,9 @@ public enum SQLExceptionCode { info.getMutationSizeBytes()); } }), + NEW_INTERNAL_CONNECTION_THROTTLED(731, "410M1", "Could not create connection " + + "because the internal connections already has the maximum number" + + " of connections to the target cluster."), MAX_HBASE_CLIENT_KEYVALUE_MAXSIZE_EXCEEDED(732, "LIM03", "The Phoenix Column size is bigger than maximum " + "HBase client key value allowed size for ONE_CELL_PER_COLUMN table, " + diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index 25f0f3e..ae47e7d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -19,6 +19,7 @@ package org.apache.phoenix.jdbc; import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull; import static java.util.Collections.emptyMap; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER; @@ -53,6 +54,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; @@ -173,6 +175,13 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea private Double logSamplingRate; private String sourceOfOperation; + private Object queueCreationLock = new Object(); // lock for the lazy init path of childConnections structure + private ConcurrentLinkedQueue<PhoenixConnection> childConnections = null; + + //For now just the copy constructor paths will have this as true as I don't want to change the + //public interfaces. + private final boolean isInternalConnection; + static { Tracing.addTraceMetricsSource(); } @@ -189,7 +198,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea this(connection.getQueryServices(), connection.getURL(), connection .getClientInfo(), connection.metaData, connection .getMutationState(), isDescRowKeyOrderUpgrade, - isRunningUpgrade, connection.buildingIndex); + isRunningUpgrade, connection.buildingIndex, true); this.isAutoCommit = connection.isAutoCommit; this.isAutoFlush = connection.isAutoFlush; this.sampler = connection.sampler; @@ -206,7 +215,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea this(connection.getQueryServices(), connection.getURL(), connection .getClientInfo(), connection.getMetaDataCache(), mutationState, connection.isDescVarLengthRowKeyUpgrade(), connection - .isRunningUpgrade(), connection.buildingIndex); + .isRunningUpgrade(), connection.buildingIndex, true); } public PhoenixConnection(PhoenixConnection connection, long scn) @@ -217,7 +226,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea public PhoenixConnection(PhoenixConnection connection, Properties props) throws SQLException { this(connection.getQueryServices(), connection.getURL(), props, connection.metaData, connection .getMutationState(), connection.isDescVarLengthRowKeyUpgrade(), - connection.isRunningUpgrade(), connection.buildingIndex); + connection.isRunningUpgrade(), connection.buildingIndex, true); this.isAutoCommit = connection.isAutoCommit; this.isAutoFlush = connection.isAutoFlush; this.sampler = connection.sampler; @@ -226,7 +235,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException { - this(services, url, info, metaData, null, false, false, false); + this(services, url, info, metaData, null, false, false, false, false); } public PhoenixConnection(PhoenixConnection connection, @@ -234,16 +243,17 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea throws SQLException { this(services, connection.url, info, connection.metaData, null, connection.isDescVarLengthRowKeyUpgrade(), connection - .isRunningUpgrade(), connection.buildingIndex); + .isRunningUpgrade(), connection.buildingIndex, true); } private PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, MutationState mutationState, boolean isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade, - boolean buildingIndex) throws SQLException { + boolean buildingIndex, boolean isInternalConnection) throws SQLException { GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.increment(); this.url = url; this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade; + this.isInternalConnection = isInternalConnection; // Filter user provided properties based on property policy, if // provided and QueryServices.PROPERTY_POLICY_PROVIDER_ENABLED is true @@ -389,9 +399,13 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea this.logSamplingRate = Double.parseDouble(this.services.getProps().get(QueryServices.LOG_SAMPLE_RATE, QueryServicesOptions.DEFAULT_LOG_SAMPLE_RATE)); + if(isInternalConnection) { + GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.increment(); + } else { + GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment(); + } this.sourceOfOperation = this.services.getProps().get(QueryServices.SOURCE_OPERATION_ATTRIB, null); - GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment(); } private static void checkScn(Long scnParam) throws SQLException { @@ -444,6 +458,26 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea return result.build(); } + public boolean isInternalConnection() { + return isInternalConnection; + } + + /** + * This method, and *only* this method is thread safe + * @param connection + */ + public void addChildConnection(PhoenixConnection connection) { + //double check for performance + if(childConnections == null) { + synchronized (queueCreationLock) { + if (childConnections == null) { + childConnections = new ConcurrentLinkedQueue<>(); + } + } + } + childConnections.add(connection); + } + public Sampler<?> getSampler() { return this.sampler; } @@ -658,13 +692,22 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea traceScope.close(); } closeStatements(); + synchronized (queueCreationLock) { + if (childConnections != null) { + SQLCloseables.closeAllQuietly(childConnections); + } + } } finally { services.removeConnection(this); } } finally { isClosed = true; - GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement(); + if(isInternalConnection()){ + GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.decrement(); + } else { + GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement(); + } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java index 9ebc1af..dce01d5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java @@ -26,6 +26,7 @@ import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES; import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME; import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER; import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS; +import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER; import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER; import static org.apache.phoenix.monitoring.MetricType.INDEX_COMMIT_FAILURE_SIZE; import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER; @@ -99,6 +100,7 @@ public enum GlobalClientMetrics { GLOBAL_FAILED_QUERY_COUNTER(QUERY_FAILED_COUNTER), GLOBAL_SPOOL_FILE_COUNTER(SPOOL_FILE_COUNTER), GLOBAL_OPEN_PHOENIX_CONNECTIONS(OPEN_PHOENIX_CONNECTIONS_COUNTER), + GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS(OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER), GLOBAL_QUERY_SERVICES_COUNTER(QUERY_SERVICES_COUNTER), GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER), GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER(PHOENIX_CONNECTIONS_THROTTLED_COUNTER), diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java index 1396a89..40fcad0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java @@ -58,6 +58,7 @@ public enum MetricType { WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution",LogLevel.INFO, PLong.INSTANCE), RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()",LogLevel.INFO, PLong.INSTANCE), OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections",LogLevel.OFF, PLong.INSTANCE), + OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER("io", "Number of open internal phoenix connections",LogLevel.OFF, PLong.INSTANCE), QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated",LogLevel.OFF, PLong.INSTANCE), HCONNECTIONS_COUNTER("h", "Number of HConnections created by phoenix driver",LogLevel.OFF, PLong.INSTANCE), PHOENIX_CONNECTIONS_THROTTLED_COUNTER("ct", "Number of client Phoenix connections prevented from opening " + diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 597ae27..585832e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -339,6 +339,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement @GuardedBy("connectionCountLock") private int connectionCount = 0; + + @GuardedBy("connectionCountLock") + private int internalConnectionCount = 0; + private final Object connectionCountLock = new Object(); private final boolean returnSequenceValues ; @@ -370,6 +374,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private final boolean isAutoUpgradeEnabled; private final AtomicBoolean upgradeRequired = new AtomicBoolean(false); private final int maxConnectionsAllowed; + private final int maxInternalConnectionsAllowed; private final boolean shouldThrottleNumConnections; public static final byte[] MUTEX_LOCKED = "MUTEX_LOCKED".getBytes(); @@ -455,7 +460,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement this.isAutoUpgradeEnabled = config.getBoolean(AUTO_UPGRADE_ENABLED, QueryServicesOptions.DEFAULT_AUTO_UPGRADE_ENABLED); this.maxConnectionsAllowed = config.getInt(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS); - this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0); + this.maxInternalConnectionsAllowed = config.getInt(QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, + QueryServicesOptions.DEFAULT_INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS); + this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0) || (maxInternalConnectionsAllowed > 0); if (!QueryUtil.isServerConnection(props)) { //Start queryDistruptor everytime as log level can be change at connection level as well, but we can avoid starting for server connections. try { @@ -5221,12 +5228,30 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public void addConnection(PhoenixConnection connection) throws SQLException { if (returnSequenceValues || shouldThrottleNumConnections) { synchronized (connectionCountLock) { - if (shouldThrottleNumConnections && connectionCount + 1 > maxConnectionsAllowed){ - GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment(); - throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED). - build().buildException(); + + /* + * If we are throttling connections internal connections and client created connections + * are counted separately against each respective quota. + */ + if(shouldThrottleNumConnections) { + int futureConnections = 1 + ( connection.isInternalConnection() ? internalConnectionCount : connectionCount); + int allowedConnections = connection.isInternalConnection() ? maxInternalConnectionsAllowed : maxConnectionsAllowed; + if(allowedConnections != 0 && futureConnections > allowedConnections) { + GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment(); + if(connection.isInternalConnection()) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED). + build().buildException(); + } + throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED). + build().buildException(); + } + } + + if(!connection.isInternalConnection()) { + connectionCount++; + } else { + internalConnectionCount++; } - connectionCount++; } } // If lease renewal isn't enabled, these are never cleaned up. Tracking when renewals @@ -5241,15 +5266,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (returnSequenceValues) { ConcurrentMap<SequenceKey,Sequence> formerSequenceMap = null; synchronized (connectionCountLock) { - if (--connectionCount <= 0) { - if (!this.sequenceMap.isEmpty()) { - formerSequenceMap = this.sequenceMap; - this.sequenceMap = Maps.newConcurrentMap(); + if(!connection.isInternalConnection()) { + if (connectionCount + internalConnectionCount - 1 <= 0) { + if (!this.sequenceMap.isEmpty()) { + formerSequenceMap = this.sequenceMap; + this.sequenceMap = Maps.newConcurrentMap(); + } } } - if (connectionCount < 0) { - connectionCount = 0; - } } // Since we're using the former sequenceMap, we can do this outside // the lock. @@ -5257,9 +5281,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // When there are no more connections, attempt to return any sequences returnAllSequences(formerSequenceMap); } - } else if (shouldThrottleNumConnections){ //still need to decrement connection count + } + if (returnSequenceValues || shouldThrottleNumConnections){ //still need to decrement connection count synchronized (connectionCountLock) { - if (connectionCount > 0) { + if(connection.isInternalConnection() && internalConnectionCount > 0) { + --internalConnectionCount; + } else if (connectionCount > 0) { --connectionCount; } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index c869f2b..ac9a396 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -280,6 +280,9 @@ public interface QueryServices extends SQLCloseable { //max number of connections from a single client to a single cluster. 0 is unlimited. public static final String CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS = "phoenix.client.connection.max.allowed.connections"; + //max number of connections from a single client to a single cluster. 0 is unlimited. + public static final String INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS = + "phoenix.internal.connection.max.allowed.connections"; public static final String DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB = "phoenix.default.column.encoded.bytes.attrib"; public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.immutable.storage.scheme"; public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.multitenant.immutable.storage.scheme"; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 7bc34e6..85f932b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -320,6 +320,8 @@ public class QueryServicesOptions { //by default, max connections from one client to one cluster is unlimited public static final int DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0; + //by default, max internal connections from one client to one cluster is unlimited + public static final int DEFAULT_INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0; public static final boolean DEFAULT_STATS_COLLECTION_ENABLED = true; public static final boolean DEFAULT_USE_STATS_FOR_PARALLELIZATION = true;