This is an automated email from the ASF dual-hosted git repository. yanxinyi pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new 6372e95 PHOENIX-5872 Close Internal Phoenix Connections that were running during cancel 6372e95 is described below commit 6372e95c35f5498e4a2091bffab0e3c77cd984a6 Author: Daniel Wong <daniel.w...@salesforce.com> AuthorDate: Mon Apr 27 03:38:54 2020 -0700 PHOENIX-5872 Close Internal Phoenix Connections that were running during cancel Signed-off-by: Xinyi Yan <yanxi...@apache.org> --- .../phoenix/query/MaxConcurrentConnectionsIT.java | 132 +++++++++++++++++++++ .../apache/phoenix/util/DelayedRegionServer.java | 119 +++++++++++++++++++ .../compile/MutatingParallelIteratorFactory.java | 1 + .../apache/phoenix/exception/SQLExceptionCode.java | 3 + .../org/apache/phoenix/jdbc/PhoenixConnection.java | 59 +++++++-- .../phoenix/monitoring/GlobalClientMetrics.java | 2 + .../org/apache/phoenix/monitoring/MetricType.java | 1 + .../phoenix/query/ConnectionQueryServicesImpl.java | 57 ++++++--- .../org/apache/phoenix/query/QueryServices.java | 3 + .../apache/phoenix/query/QueryServicesOptions.java | 2 + 10 files changed, 356 insertions(+), 23 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java new file mode 100644 index 0000000..7da276c --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.query; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.util.DelayedRegionServer; +import org.apache.phoenix.util.PhoenixRuntime; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static org.apache.phoenix.exception.SQLExceptionCode.NEW_CONNECTION_THROTTLED; +import static org.apache.phoenix.exception.SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS; +import static org.apache.phoenix.query.QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS; +import static org.apache.phoenix.query.QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS; +import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_ENABLED; +import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB; +import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Note that some tests for concurrentConnections live in PhoenixMetricsIT.java which also test the metric emission + */ +public class MaxConcurrentConnectionsIT extends BaseUniqueNamesOwnClusterIT { + + private static HBaseTestingUtility hbaseTestUtil; + + @BeforeClass + public static void setUp() throws Exception { + hbaseTestUtil = new HBaseTestingUtility(); + + hbaseTestUtil.startMiniCluster(1,1,null,null,DelayedRegionServer.class); + // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver + String zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort(); + url = PhoenixRuntime.JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkQuorum + + JDBC_PROTOCOL_SEPARATOR + "uniqueConn=A"; + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + private String getUniqueUrl() { + return url + generateUniqueName(); + } + + //Have to shutdown our special delayed region server + @AfterClass + public static void tearDown() throws Exception { + hbaseTestUtil.shutdownMiniCluster(); + } + + /** + * This tests the delete path which creates a internal phoenix connection per region + * @throws Exception + */ + @Test + public void testDeleteRuntimeFailureClosesConnections() throws Exception { + String tableName = generateUniqueName(); + String connectionUrl = getUniqueUrl(); + //table with lots of regions + String ddl = "create table " + tableName + " (i integer not null primary key, j integer) SALT_BUCKETS=256 "; + + Properties props = new Properties(); + props.setProperty(CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS,String.valueOf(10)); + props.setProperty(INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS,String.valueOf(10)); + + //delay any task handeling as that causes additional connections + props.setProperty(TASK_HANDLING_INTERVAL_MS_ATTRIB,String.valueOf(600000)); + props.setProperty(TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,String.valueOf(600000)); + + String deleteStmt = "DELETE FROM " + tableName + " WHERE 20 = j"; + + try(Connection conn = DriverManager.getConnection(connectionUrl, props); Statement statement = conn.createStatement()) { + statement.execute(ddl); + } + + assertEquals(0, GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue()); + assertEquals(0, GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.getMetric().getValue()); + Connection conn = null; + try { + conn = DriverManager.getConnection(connectionUrl, props); + //Enable delay for the delete + DelayedRegionServer.setDelayEnabled(true); + try (Statement statement = conn.createStatement()) { + statement.execute(deleteStmt); + } + fail(); + } catch (SQLException e) { + assertEquals(NEW_INTERNAL_CONNECTION_THROTTLED.getErrorCode(), e.getErrorCode()); + assertEquals(NEW_INTERNAL_CONNECTION_THROTTLED.getSQLState(), e.getSQLState()); + } finally { + DelayedRegionServer.setDelayEnabled(false); + if (conn != null) { + conn.close(); + } + long connections = GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue(); + assertEquals(String.format("Found %d connections still open.", connections),0,connections); + connections = GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.getMetric().getValue(); + assertEquals(String.format("Found %d internal connections still open.", connections),0 ,connections); + } + + } +} \ No newline at end of file diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java b/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java new file mode 100644 index 0000000..bf02621 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/util/DelayedRegionServer.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.util; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * This is a extended MiniHbaseCluster Region Server whcih allows developer/tester to inject + * delay into specific server side operations for testing. + */ +public class DelayedRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer { + + private static final Logger LOGGER = LoggerFactory.getLogger(DelayedRegionServer.class); + + static boolean doDelay = false; + // Activate the delays after table creation to test get/scan/put + private static int DELAY_GET = 0; + private static int DELAY_SCAN = 30000; + private static int DELAY_MUTATE = 0; + + public static void setDelayEnabled(boolean delay) { + doDelay = delay; + } + + public static void setDelayGet(int delayGet) { + DELAY_GET = delayGet; + } + + public static void setDelayScan(int delayScan) { + DELAY_SCAN = delayScan; + } + + public static void setDelayMutate(int delayMutate) { + DELAY_MUTATE = delayMutate; + } + + public DelayedRegionServer(Configuration conf, CoordinatedStateManager cp) + throws IOException, InterruptedException { + super(conf, cp); + } + + @Override protected RSRpcServices createRpcServices() throws IOException { + return new DelayedRSRpcServices(this); + } + + /** + * This class injects delay for Rpc calls and after executes super methods is delay is set. + */ + public static class DelayedRSRpcServices extends RSRpcServices { + + DelayedRSRpcServices(HRegionServer rs) throws IOException { + super(rs); + } + + @Override public ClientProtos.GetResponse get(RpcController controller, + ClientProtos.GetRequest request) throws ServiceException { + try { + if (doDelay) { + Thread.sleep(DELAY_GET); + } + } catch (InterruptedException e) { + LOGGER.error("Sleep interrupted during get operation", e); + } + return super.get(controller, request); + } + + @Override public ClientProtos.MutateResponse mutate(RpcController rpcc, + ClientProtos.MutateRequest request) throws ServiceException { + try { + if (doDelay) { + Thread.sleep(DELAY_MUTATE); + } + } catch (InterruptedException e) { + LOGGER.error("Sleep interrupted during mutate operation", e); + } + return super.mutate(rpcc, request); + } + + @Override public ClientProtos.ScanResponse scan(RpcController controller, + ClientProtos.ScanRequest request) throws ServiceException { + try { + if (doDelay) { + Thread.sleep(DELAY_SCAN); + } + } catch (InterruptedException e) { + LOGGER.error("Sleep interrupted during scan operation", e); + } + return super.scan(controller, request); + } + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java index c98862d..755f127 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java @@ -64,6 +64,7 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato QueryPlan plan) throws SQLException { final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection); + connection.addChildConnection(clonedConnection); try { MutationState state = mutate(parentContext, iterator, clonedConnection); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 3b4eeb0..055aa1f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -513,6 +513,9 @@ public enum SQLExceptionCode { info.getMutationSizeBytes()); } }), + NEW_INTERNAL_CONNECTION_THROTTLED(731, "410M1", "Could not create connection " + + "because the internal connections already has the maximum number" + + " of connections to the target cluster."), INSUFFICIENT_MEMORY(999, "50M01", "Unable to allocate enough memory."), HASH_JOIN_CACHE_NOT_FOUND(900, "HJ01", "Hash Join cache not found"), diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index d302bcf..7cb9982 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -19,6 +19,7 @@ package org.apache.phoenix.jdbc; import static com.google.common.base.Preconditions.checkNotNull; import static java.util.Collections.emptyMap; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER; @@ -53,6 +54,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; @@ -172,6 +174,13 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea private LogLevel logLevel; private Double logSamplingRate; + private Object queueCreationLock = new Object(); // lock for the lazy init path of childConnections structure + private ConcurrentLinkedQueue<PhoenixConnection> childConnections = null; + + //For now just the copy constructor paths will have this as true as I don't want to change the + //public interfaces. + private final boolean isInternalConnection; + static { Tracing.addTraceMetricsSource(); } @@ -188,7 +197,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea this(connection.getQueryServices(), connection.getURL(), connection .getClientInfo(), connection.metaData, connection .getMutationState(), isDescRowKeyOrderUpgrade, - isRunningUpgrade, connection.buildingIndex); + isRunningUpgrade, connection.buildingIndex, true); this.isAutoCommit = connection.isAutoCommit; this.isAutoFlush = connection.isAutoFlush; this.sampler = connection.sampler; @@ -205,7 +214,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea this(connection.getQueryServices(), connection.getURL(), connection .getClientInfo(), connection.getMetaDataCache(), mutationState, connection.isDescVarLengthRowKeyUpgrade(), connection - .isRunningUpgrade(), connection.buildingIndex); + .isRunningUpgrade(), connection.buildingIndex, true); } public PhoenixConnection(PhoenixConnection connection, long scn) @@ -216,7 +225,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea public PhoenixConnection(PhoenixConnection connection, Properties props) throws SQLException { this(connection.getQueryServices(), connection.getURL(), props, connection.metaData, connection .getMutationState(), connection.isDescVarLengthRowKeyUpgrade(), - connection.isRunningUpgrade(), connection.buildingIndex); + connection.isRunningUpgrade(), connection.buildingIndex, true); this.isAutoCommit = connection.isAutoCommit; this.isAutoFlush = connection.isAutoFlush; this.sampler = connection.sampler; @@ -225,7 +234,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException { - this(services, url, info, metaData, null, false, false, false); + this(services, url, info, metaData, null, false, false, false, false); } public PhoenixConnection(PhoenixConnection connection, @@ -233,16 +242,17 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea throws SQLException { this(services, connection.url, info, connection.metaData, null, connection.isDescVarLengthRowKeyUpgrade(), connection - .isRunningUpgrade(), connection.buildingIndex); + .isRunningUpgrade(), connection.buildingIndex, true); } private PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, MutationState mutationState, boolean isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade, - boolean buildingIndex) throws SQLException { + boolean buildingIndex, boolean isInternalConnection) throws SQLException { GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.increment(); this.url = url; this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade; + this.isInternalConnection = isInternalConnection; // Filter user provided properties based on property policy, if // provided and QueryServices.PROPERTY_POLICY_PROVIDER_ENABLED is true @@ -388,7 +398,11 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea this.logSamplingRate = Double.parseDouble(this.services.getProps().get(QueryServices.LOG_SAMPLE_RATE, QueryServicesOptions.DEFAULT_LOG_SAMPLE_RATE)); - GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment(); + if(isInternalConnection) { + GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.increment(); + } else { + GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment(); + } } private static void checkScn(Long scnParam) throws SQLException { @@ -441,6 +455,26 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea return result.build(); } + public boolean isInternalConnection() { + return isInternalConnection; + } + + /** + * This method, and *only* this method is thread safe + * @param connection + */ + public void addChildConnection(PhoenixConnection connection) { + //double check for performance + if(childConnections == null) { + synchronized (queueCreationLock) { + if (childConnections == null) { + childConnections = new ConcurrentLinkedQueue<>(); + } + } + } + childConnections.add(connection); + } + public Sampler<?> getSampler() { return this.sampler; } @@ -655,13 +689,22 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea traceScope.close(); } closeStatements(); + synchronized (queueCreationLock) { + if (childConnections != null) { + SQLCloseables.closeAllQuietly(childConnections); + } + } } finally { services.removeConnection(this); } } finally { isClosed = true; - GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement(); + if(isInternalConnection()){ + GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.decrement(); + } else { + GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement(); + } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java index f41ab52..ce73a79 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java @@ -26,6 +26,7 @@ import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES; import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME; import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER; import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS; +import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER; import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER; import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER; import static org.apache.phoenix.monitoring.MetricType.QUERY_SERVICES_COUNTER; @@ -93,6 +94,7 @@ public enum GlobalClientMetrics { GLOBAL_FAILED_QUERY_COUNTER(QUERY_FAILED_COUNTER), GLOBAL_SPOOL_FILE_COUNTER(SPOOL_FILE_COUNTER), GLOBAL_OPEN_PHOENIX_CONNECTIONS(OPEN_PHOENIX_CONNECTIONS_COUNTER), + GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS(OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER), GLOBAL_QUERY_SERVICES_COUNTER(QUERY_SERVICES_COUNTER), GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER), GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER(PHOENIX_CONNECTIONS_THROTTLED_COUNTER), diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java index 8e1de66..41d9e19 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java @@ -57,6 +57,7 @@ public enum MetricType { WALL_CLOCK_TIME_MS("tq", "Wall clock time elapsed for the overall query execution",LogLevel.INFO, PLong.INSTANCE), RESULT_SET_TIME_MS("tn", "Wall clock time elapsed for reading all records using resultSet.next()",LogLevel.INFO, PLong.INSTANCE), OPEN_PHOENIX_CONNECTIONS_COUNTER("o", "Number of open phoenix connections",LogLevel.OFF, PLong.INSTANCE), + OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER("io", "Number of open internal phoenix connections",LogLevel.OFF, PLong.INSTANCE), QUERY_SERVICES_COUNTER("cqs", "Number of ConnectionQueryServicesImpl instantiated",LogLevel.OFF, PLong.INSTANCE), HCONNECTIONS_COUNTER("h", "Number of HConnections created by phoenix driver",LogLevel.OFF, PLong.INSTANCE), PHOENIX_CONNECTIONS_THROTTLED_COUNTER("ct", "Number of client Phoenix connections prevented from opening " + diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index b708b75..c504df4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -319,6 +319,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement @GuardedBy("connectionCountLock") private int connectionCount = 0; + + @GuardedBy("connectionCountLock") + private int internalConnectionCount = 0; + private final Object connectionCountLock = new Object(); private final boolean returnSequenceValues ; @@ -350,6 +354,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private final boolean isAutoUpgradeEnabled; private final AtomicBoolean upgradeRequired = new AtomicBoolean(false); private final int maxConnectionsAllowed; + private final int maxInternalConnectionsAllowed; private final boolean shouldThrottleNumConnections; public static final byte[] MUTEX_LOCKED = "MUTEX_LOCKED".getBytes(); @@ -436,7 +441,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement this.isAutoUpgradeEnabled = config.getBoolean(AUTO_UPGRADE_ENABLED, QueryServicesOptions.DEFAULT_AUTO_UPGRADE_ENABLED); this.maxConnectionsAllowed = config.getInt(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS); - this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0); + this.maxInternalConnectionsAllowed = config.getInt(QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, + QueryServicesOptions.DEFAULT_INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS); + this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0) || (maxInternalConnectionsAllowed > 0); if (!QueryUtil.isServerConnection(props)) { //Start queryDistruptor everytime as log level can be change at connection level as well, but we can avoid starting for server connections. try { @@ -4957,12 +4964,30 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public void addConnection(PhoenixConnection connection) throws SQLException { if (returnSequenceValues || shouldThrottleNumConnections) { synchronized (connectionCountLock) { - if (shouldThrottleNumConnections && connectionCount + 1 > maxConnectionsAllowed){ - GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment(); - throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED). - build().buildException(); + + /* + * If we are throttling connections internal connections and client created connections + * are counted separately against each respective quota. + */ + if(shouldThrottleNumConnections) { + int futureConnections = 1 + ( connection.isInternalConnection() ? internalConnectionCount : connectionCount); + int allowedConnections = connection.isInternalConnection() ? maxInternalConnectionsAllowed : maxConnectionsAllowed; + if(allowedConnections != 0 && futureConnections > allowedConnections) { + GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment(); + if(connection.isInternalConnection()) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED). + build().buildException(); + } + throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED). + build().buildException(); + } + } + + if(!connection.isInternalConnection()) { + connectionCount++; + } else { + internalConnectionCount++; } - connectionCount++; } } // If lease renewal isn't enabled, these are never cleaned up. Tracking when renewals @@ -4977,15 +5002,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (returnSequenceValues) { ConcurrentMap<SequenceKey,Sequence> formerSequenceMap = null; synchronized (connectionCountLock) { - if (--connectionCount <= 0) { - if (!this.sequenceMap.isEmpty()) { - formerSequenceMap = this.sequenceMap; - this.sequenceMap = Maps.newConcurrentMap(); + if(!connection.isInternalConnection()) { + if (connectionCount + internalConnectionCount - 1 <= 0) { + if (!this.sequenceMap.isEmpty()) { + formerSequenceMap = this.sequenceMap; + this.sequenceMap = Maps.newConcurrentMap(); + } } } - if (connectionCount < 0) { - connectionCount = 0; - } } // Since we're using the former sequenceMap, we can do this outside // the lock. @@ -4993,9 +5017,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // When there are no more connections, attempt to return any sequences returnAllSequences(formerSequenceMap); } - } else if (shouldThrottleNumConnections){ //still need to decrement connection count + } + if (returnSequenceValues || shouldThrottleNumConnections){ //still need to decrement connection count synchronized (connectionCountLock) { - if (connectionCount > 0) { + if(connection.isInternalConnection() && internalConnectionCount > 0) { + --internalConnectionCount; + } else if (connectionCount > 0) { --connectionCount; } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 34af089..77adc4f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -304,6 +304,9 @@ public interface QueryServices extends SQLCloseable { //max number of connections from a single client to a single cluster. 0 is unlimited. public static final String CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS = "phoenix.client.connection.max.allowed.connections"; + //max number of connections from a single client to a single cluster. 0 is unlimited. + public static final String INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS = + "phoenix.internal.connection.max.allowed.connections"; public static final String DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB = "phoenix.default.column.encoded.bytes.attrib"; public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.immutable.storage.scheme"; public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.multitenant.immutable.storage.scheme"; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 4e62e25..cfa0aec 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -343,6 +343,8 @@ public class QueryServicesOptions { //by default, max connections from one client to one cluster is unlimited public static final int DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0; + //by default, max internal connections from one client to one cluster is unlimited + public static final int DEFAULT_INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0; public static final boolean DEFAULT_STATS_COLLECTION_ENABLED = true; public static final boolean DEFAULT_USE_STATS_FOR_PARALLELIZATION = true;