PHOENIX-3663 - Implement resource controls in Phoenix JDBC driver.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cbc43bbb Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cbc43bbb Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cbc43bbb Branch: refs/heads/calcite Commit: cbc43bbb6fbcbd3bdcb5a246d02edecb0e939b43 Parents: 8e1d10b Author: Geoffrey Jacoby <[email protected]> Authored: Tue Feb 28 16:34:26 2017 -0800 Committer: Geoffrey Jacoby <[email protected]> Committed: Tue Feb 28 16:34:26 2017 -0800 ---------------------------------------------------------------------- .../phoenix/monitoring/PhoenixMetricsIT.java | 39 +++++++++++++++++++- .../phoenix/exception/SQLExceptionCode.java | 5 ++- .../apache/phoenix/jdbc/PhoenixConnection.java | 2 + .../org/apache/phoenix/jdbc/PhoenixDriver.java | 3 -- .../phoenix/monitoring/GlobalClientMetrics.java | 8 +++- .../apache/phoenix/monitoring/MetricType.java | 5 ++- .../query/ConnectionQueryServicesImpl.java | 16 +++++++- .../org/apache/phoenix/query/QueryServices.java | 5 ++- .../phoenix/query/QueryServicesOptions.java | 4 +- 9 files changed, 75 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java index 4d075ab..04d125a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java @@ -17,6 +17,8 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER; @@ -55,6 +57,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; +import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDriver; import org.apache.phoenix.jdbc.PhoenixResultSet; @@ -850,7 +853,41 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { exec.shutdownNow(); } } - + + @Test + public void testGetConnectionsThrottledForSameUrl() throws Exception { + int expectedPhoenixConnections = 11; + List<Connection> connections = Lists.newArrayList(); + String zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort(); + String url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum + + ':' + CUSTOM_URL_STRING + '=' + "throttletest"; + + Properties props = new Properties(); + props.setProperty(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, "10"); + + GLOBAL_HCONNECTIONS_COUNTER.getMetric().reset(); + GLOBAL_QUERY_SERVICES_COUNTER.getMetric().reset(); + GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.getMetric().reset(); + GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.getMetric().reset(); + boolean wasThrottled = false; + try { + for (int k = 0; k < expectedPhoenixConnections; k++) { + connections.add(DriverManager.getConnection(url, props)); + } + } catch (SQLException se) { + wasThrottled = true; + assertEquals(SQLExceptionCode.NEW_CONNECTION_THROTTLED.getErrorCode(), se.getErrorCode()); + } finally { + for (Connection c : connections) { + c.close(); + } + } + assertEquals(1, GLOBAL_QUERY_SERVICES_COUNTER.getMetric().getValue()); + assertTrue("No connection was throttled!", wasThrottled); + assertEquals(1, GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.getMetric().getValue()); + assertEquals(expectedPhoenixConnections, GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.getMetric().getValue()); + } + @Test public void testGetConnectionsForDifferentTenantsConcurrently() throws Exception { // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 8595eda..1e48640 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -426,7 +426,10 @@ public enum SQLExceptionCode { "Cannot create schema because config " + QueryServices.IS_NAMESPACE_MAPPING_ENABLED + " for enabling name space mapping isn't enabled."), INCONSISTENET_NAMESPACE_MAPPING_PROPERTIES( 726, "43M10", " Inconsistent namespace mapping properites.."), ASYNC_NOT_ALLOWED( - 727, "43M11", " ASYNC option is not allowed.. "); + 727, "43M11", " ASYNC option is not allowed.. "), + NEW_CONNECTION_THROTTLED(728, "410M1", "Could not create connection " + + "because this client already has the maximum number" + + " of connections to the target cluster."); private final int errorCode; private final String sqlState; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index cb2390e..5f5237f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -20,6 +20,7 @@ package org.apache.phoenix.jdbc; import static com.google.common.base.Preconditions.checkNotNull; import static java.util.Collections.emptyMap; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER; import java.io.EOFException; import java.io.IOException; @@ -214,6 +215,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, MutationState mutationState, boolean isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade) throws SQLException { + GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.increment(); this.url = url; this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade; // Copy so client cannot change http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java index b2acacf..67ac9c9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java @@ -149,8 +149,6 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver { private Cache<ConnectionInfo, ConnectionQueryServices> initializeConnectionCache() { Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); - int maxCacheSize = config.getInt(QueryServices.CLIENT_CONNECTION_CACHE_MAX_SIZE, - QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_SIZE); int maxCacheDuration = config.getInt(QueryServices.CLIENT_CONNECTION_CACHE_MAX_DURATION_MILLISECONDS, QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION); RemovalListener<ConnectionInfo, ConnectionQueryServices> cacheRemovalListener = @@ -170,7 +168,6 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver { } }; return CacheBuilder.newBuilder() - .maximumSize(maxCacheSize) .expireAfterAccess(maxCacheDuration, TimeUnit.MILLISECONDS) .removalListener(cacheRemovalListener) .build(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java index fab4d27..b5f9422 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java @@ -39,7 +39,8 @@ import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME; import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER; import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME; import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME; - +import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -77,7 +78,10 @@ public enum GlobalClientMetrics { GLOBAL_SPOOL_FILE_COUNTER(SPOOL_FILE_COUNTER), GLOBAL_OPEN_PHOENIX_CONNECTIONS(OPEN_PHOENIX_CONNECTIONS_COUNTER), GLOBAL_QUERY_SERVICES_COUNTER(QUERY_SERVICES_COUNTER), - GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER); + GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER), + GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER(PHOENIX_CONNECTIONS_THROTTLED_COUNTER), + GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER(PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER); + private static final boolean isGlobalMetricsEnabled = QueryServicesOptions.withDefaults().isGlobalMetricsEnabled(); private GlobalMetric metric; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java index b420b75..7b21de5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java @@ -43,7 +43,10 @@ public enum MetricType { RESULT_SET_TIME_MS("Wall clock time elapsed for reading all records using resultSet.next()"), OPEN_PHOENIX_CONNECTIONS_COUNTER("Number of open phoenix connections"), QUERY_SERVICES_COUNTER("Number of ConnectionQueryServicesImpl instantiated"), - HCONNECTIONS_COUNTER("Number of HConnections created by phoenix driver"); + HCONNECTIONS_COUNTER("Number of HConnections created by phoenix driver"), + PHOENIX_CONNECTIONS_THROTTLED_COUNTER("Number of client Phoenix connections prevented from opening " + + "because there are already too many to that target cluster."), + PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("Number of requests for Phoenix connections, whether successful or not."); private final String description; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 2329432..03a5e13 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -49,6 +49,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER; import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA; @@ -307,6 +308,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private final boolean renewLeaseEnabled; private final boolean isAutoUpgradeEnabled; private final AtomicBoolean upgradeRequired = new AtomicBoolean(false); + private final int maxConnectionsAllowed; + private final boolean shouldThrottleNumConnections; public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes(); public static final byte[] UPGRADE_MUTEX_LOCKED = "UPGRADE_MUTEX_LOCKED".getBytes(); public static final byte[] UPGRADE_MUTEX_UNLOCKED = "UPGRADE_MUTEX_UNLOCKED".getBytes(); @@ -387,6 +390,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // A little bit of a smell to leak `this` here, but should not be a problem this.tableStatsCache = new GuidePostsCache(this, config); this.isAutoUpgradeEnabled = config.getBoolean(AUTO_UPGRADE_ENABLED, QueryServicesOptions.DEFAULT_AUTO_UPGRADE_ENABLED); + this.maxConnectionsAllowed = config.getInt(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, + QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS); + this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0); + } @Override @@ -3796,12 +3803,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement @Override public void addConnection(PhoenixConnection connection) throws SQLException { - connectionQueues.get(getQueueIndex(connection)).add(new WeakReference<PhoenixConnection>(connection)); - if (returnSequenceValues) { + if (returnSequenceValues || shouldThrottleNumConnections) { synchronized (connectionCountLock) { + if (shouldThrottleNumConnections && connectionCount + 1 > maxConnectionsAllowed){ + GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER.increment(); + throw new SQLExceptionInfo.Builder(SQLExceptionCode.NEW_CONNECTION_THROTTLED). + build().buildException(); + } connectionCount++; } } + connectionQueues.get(getQueueIndex(connection)).add(new WeakReference<PhoenixConnection>(connection)); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 1366add..0b7b737 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -238,9 +238,12 @@ public interface QueryServices extends SQLCloseable { public static final String CLIENT_CACHE_ENCODING = "phoenix.table.client.cache.encoding"; public static final String AUTO_UPGRADE_ENABLED = "phoenix.autoupgrade.enabled"; - public static final String CLIENT_CONNECTION_CACHE_MAX_SIZE = "phoenix.client.connection.cache.max.size"; public static final String CLIENT_CONNECTION_CACHE_MAX_DURATION_MILLISECONDS = "phoenix.client.connection.max.duration"; + + //max number of connections from a single client to a single cluster. 0 is unlimited. + public static final String CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS = + "phoenix.client.connection.max.allowed.connections"; public static final String DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB = "phoenix.default.column.encoded.bytes.attrib"; public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.immutable.storage.scheme"; public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.multitenant.immutable.storage.scheme"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cbc43bbb/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index f885d5c..4fd1344 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -268,12 +268,14 @@ public class QueryServicesOptions { public static final String DEFAULT_CLIENT_CACHE_ENCODING = PTableRefFactory.Encoding.OBJECT.toString(); public static final boolean DEFAULT_AUTO_UPGRADE_ENABLED = true; - public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_SIZE = 100; public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION = 86400000; public static final int DEFAULT_COLUMN_ENCODED_BYTES = QualifierEncodingScheme.TWO_BYTE_QUALIFIERS.getSerializedMetadataValue(); public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME = ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS.toString(); public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME = ImmutableStorageScheme.ONE_CELL_PER_COLUMN.toString(); + //by default, max connections from one client to one cluster is unlimited + public static final int DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0; + @SuppressWarnings("serial") public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() { {
