This is an automated email from the ASF dual-hosted git repository. apurtell pushed a commit to branch PHOENIX-7562-feature in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit bd2e226afe51effe5cfc309358a73e9082c9586f Author: ritegarg <58840065+riteg...@users.noreply.github.com> AuthorDate: Wed May 14 10:46:35 2025 -0700 PHOENIX-7605 Adding ability to configure threadpool at CQSI level (#2138) Co-authored-by: Ritesh Garg <ritesh.g...@riteshg-ltmd34g.internal.salesforce.com> --- .../phoenix/query/ConnectionQueryServicesImpl.java | 118 +++++++-- .../apache/phoenix/query/HConnectionFactory.java | 19 ++ .../org/apache/phoenix/query/HTableFactory.java | 8 +- .../org/apache/phoenix/query/QueryServices.java | 10 + .../apache/phoenix/query/QueryServicesOptions.java | 26 +- .../phoenix/jdbc/FailoverPhoenixConnectionIT.java | 31 +++ .../phoenix/jdbc/ParallelPhoenixConnectionIT.java | 54 ++++ .../ConnectionQueryServicesImplThreadPoolIT.java | 285 +++++++++++++++++++++ .../java/org/apache/phoenix/query/BaseTest.java | 8 + .../query/ConnectionQueryServicesImplTest.java | 53 +++- 10 files changed, 583 insertions(+), 29 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 762eccf054..a5998750dd 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -81,11 +81,17 @@ import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_VIEW_TTL_ENABLED; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_ENABLED; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_MAX_THREADS; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_CORE_POOL_SIZE; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_QUERY_SERVICES_NAME; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS; -import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SYSTEM_CATALOG_INDEXES_ENABLED; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_TIMEOUT_DURING_UPGRADE_MS; import static org.apache.phoenix.util.UpgradeUtil.addParentToChildLinks; import static org.apache.phoenix.util.UpgradeUtil.addViewIndexToParentLinks; @@ -118,6 +124,7 @@ import java.util.Objects; import java.util.Properties; import java.util.Random; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -132,6 +139,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -185,6 +193,7 @@ import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.ipc.RemoteException; @@ -417,6 +426,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private Connection invalidateMetadataCacheConnection = null; private final Object invalidateMetadataCacheConnLock = new Object(); private MetricsMetadataCachingSource metricsMetadataCachingSource; + private ThreadPoolExecutor threadPoolExecutor = null; + private static final AtomicInteger threadPoolNumber = new AtomicInteger(1); public static final String INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE = "Cannot invalidate server metadata cache on a non-server connection"; @@ -451,9 +462,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement /** * Construct a ConnectionQueryServicesImpl that represents a connection to an HBase * cluster. - * @param services base services from where we derive our default configuration + * + * @param services base services from where we derive our default configuration * @param connectionInfo to provide connection information - * @param info hbase configuration properties + * @param info hbase configuration properties */ public ConnectionQueryServicesImpl(QueryServices services, ConnectionInfo connectionInfo, Properties info) { super(services); @@ -477,23 +489,73 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement this.connectionInfo = connectionInfo; // Without making a copy of the configuration we cons up, we lose some of our properties - // on the server side during testing. - this.config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config); + // on the server side during testing. This allows the application overridden + // ConfigurationFactory to inject/modify configs + Configuration finalConfig = HBaseFactoryProvider + .getConfigurationFactory().getConfiguration(config); + this.config = finalConfig; + + if (finalConfig.getBoolean(CQSI_THREAD_POOL_ENABLED, DEFAULT_CQSI_THREAD_POOL_ENABLED)) { + final int keepAlive = finalConfig.getInt(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, + DEFAULT_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS); + final int corePoolSize = finalConfig.getInt(CQSI_THREAD_POOL_CORE_POOL_SIZE, + DEFAULT_CQSI_THREAD_POOL_CORE_POOL_SIZE); + final int maxThreads = finalConfig.getInt(CQSI_THREAD_POOL_MAX_THREADS, + DEFAULT_CQSI_THREAD_POOL_MAX_THREADS); + final int maxQueue = finalConfig.getInt(CQSI_THREAD_POOL_MAX_QUEUE, + DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE); + final String threadPoolName = connectionInfo.getPrincipal() != null + ? connectionInfo.getPrincipal() + : DEFAULT_QUERY_SERVICES_NAME; + // Based on implementations used in + // org.apache.hadoop.hbase.client.ConnectionImplementation + final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(maxQueue); + this.threadPoolExecutor = + new ThreadPoolExecutor(corePoolSize, maxThreads, keepAlive, TimeUnit.SECONDS, + workQueue, new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("CQSI-" + threadPoolName + + "-" + threadPoolNumber.incrementAndGet() + + "-shared-pool-%d") + .setUncaughtExceptionHandler( + Threads.LOGGING_EXCEPTION_HANDLER) + .build()); + this.threadPoolExecutor.allowCoreThreadTimeOut(finalConfig + .getBoolean(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, + DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT)); + LOGGER.info("For ConnectionQueryService = {} , " + + "CQSI ThreadPool Configs {} = {}, {} = {}, {} = {}, {} = {}, {} = {}", + threadPoolName, + CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, + finalConfig.get(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS), + CQSI_THREAD_POOL_CORE_POOL_SIZE, + finalConfig.get(CQSI_THREAD_POOL_CORE_POOL_SIZE), + CQSI_THREAD_POOL_MAX_THREADS, + finalConfig.get(CQSI_THREAD_POOL_MAX_THREADS), + CQSI_THREAD_POOL_MAX_QUEUE, + finalConfig.get(CQSI_THREAD_POOL_MAX_QUEUE), + CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, + finalConfig.get(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT)); + } + + LOGGER.info( - "CQS Configs {} = {} , {} = {} , {} = {} , {} = {} , {} = {} , {} = {} , {} = {}", + "CQS Configs {} = {} , {} = {} , {} = {} , {} = {} , {} = {} , {} = {} , {} = {}, {} = {}", HConstants.ZOOKEEPER_QUORUM, - this.config.get(HConstants.ZOOKEEPER_QUORUM), HConstants.CLIENT_ZOOKEEPER_QUORUM, - this.config.get(HConstants.CLIENT_ZOOKEEPER_QUORUM), + finalConfig.get(HConstants.ZOOKEEPER_QUORUM), HConstants.CLIENT_ZOOKEEPER_QUORUM, + finalConfig.get(HConstants.CLIENT_ZOOKEEPER_QUORUM), HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, - this.config.get(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT), + finalConfig.get(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT), HConstants.ZOOKEEPER_CLIENT_PORT, - this.config.get(HConstants.ZOOKEEPER_CLIENT_PORT), + finalConfig.get(HConstants.ZOOKEEPER_CLIENT_PORT), RPCConnectionInfo.BOOTSTRAP_NODES, - this.config.get(RPCConnectionInfo.BOOTSTRAP_NODES), - HConstants.MASTER_ADDRS_KEY, this.config.get(HConstants.MASTER_ADDRS_KEY), + finalConfig.get(RPCConnectionInfo.BOOTSTRAP_NODES), + HConstants.MASTER_ADDRS_KEY, finalConfig.get(HConstants.MASTER_ADDRS_KEY), ConnectionInfo.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, - this.config.get(ConnectionInfo.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY)); + finalConfig.get(ConnectionInfo.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY), + QueryServices.CQSI_THREAD_POOL_ENABLED, + finalConfig.get(QueryServices.CQSI_THREAD_POOL_ENABLED)); //Set the rpcControllerFactory if it is a server side connnection. boolean isServerSideConnection = config.getBoolean(QueryUtil.IS_SERVER_CONNECTION, false); @@ -501,8 +563,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement this.serverSideRPCControllerFactory = new ServerSideRPCControllerFactory(config); } // set replication required parameter - ConfigUtil.setReplicationConfigIfAbsent(this.config); - this.props = new ReadOnlyProps(this.config.iterator()); + ConfigUtil.setReplicationConfigIfAbsent(finalConfig); + this.props = new ReadOnlyProps(finalConfig.iterator()); this.userName = connectionInfo.getPrincipal(); this.user = connectionInfo.getUser(); this.latestMetaData = newEmptyMetaData(); @@ -557,17 +619,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement .build(); } - if (this.config.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { + if (finalConfig.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { // "hbase.client.metrics.scope" defined on // org.apache.hadoop.hbase.client.MetricsConnection#METRICS_SCOPE_KEY // however we cannot use the constant directly as long as we support HBase 2.4 profile. - this.config.set("hbase.client.metrics.scope", config.get(QUERY_SERVICES_NAME)); + finalConfig.set("hbase.client.metrics.scope", config.get(QUERY_SERVICES_NAME)); } if (!QueryUtil.isServerConnection(props)) { //Start queryDistruptor everytime as log level can be change at connection level as well, but we can avoid starting for server connections. try { - this.queryDisruptor = new QueryLoggerDisruptor(this.config); + this.queryDisruptor = new QueryLoggerDisruptor(finalConfig); } catch (SQLException e) { LOGGER.warn("Unable to initiate query logging service !!"); e.printStackTrace(); @@ -582,7 +644,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private Connection openConnection(Configuration conf) throws SQLException { Connection localConnection; try { - localConnection = HBaseFactoryProvider.getHConnectionFactory().createConnection(conf); + localConnection = HBaseFactoryProvider.getHConnectionFactory() + .createConnection(conf, threadPoolExecutor); GLOBAL_HCONNECTIONS_COUNTER.increment(); LOGGER.info("HConnection established. Stacktrace for informational purposes: " + localConnection + " " + LogUtil.getCallerStackTrace()); @@ -634,7 +697,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public Table getTable(byte[] tableName) throws SQLException { try { return HBaseFactoryProvider.getHTableFactory().getTable(tableName, - connection, null); + connection, threadPoolExecutor); } catch (IOException e) { throw new SQLException(e); } @@ -760,6 +823,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try { tableStatsCache.invalidateAll(); super.close(); + shutdownThreadPool(this.threadPoolExecutor); } catch (SQLException e) { if (sqlE == null) { sqlE = e; @@ -774,6 +838,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } + // Based on org.apache.hadoop.hbase.client.ConnectionImplementation + private void shutdownThreadPool(ThreadPoolExecutor pool) { + if (pool != null && !pool.isShutdown()) { + pool.shutdown(); + try { + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); + } + } catch (InterruptedException e) { + pool.shutdownNow(); + } + } + } + protected ConnectionQueryServices newChildQueryService() { return new ChildQueryServices(this); } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/HConnectionFactory.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/HConnectionFactory.java index c3e0eb54dc..b296b35892 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/HConnectionFactory.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/HConnectionFactory.java @@ -18,6 +18,7 @@ package org.apache.phoenix.query; import java.io.IOException; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; @@ -38,6 +39,18 @@ public interface HConnectionFactory { */ Connection createConnection(Configuration conf) throws IOException; + /** + * Creates HConnection to access HBase clusters. + * + * @param conf object + * @param pool object + * @return A HConnection instance + */ + default Connection createConnection(Configuration conf, ExecutorService pool) + throws IOException { + return createConnection(conf); + } + /** * Default implementation. Uses standard HBase HConnections. */ @@ -46,5 +59,11 @@ public interface HConnectionFactory { public Connection createConnection(Configuration conf) throws IOException { return ConnectionFactory.createConnection(conf); } + + @Override + public Connection createConnection(Configuration conf, ExecutorService pool) + throws IOException { + return ConnectionFactory.createConnection(conf, pool); + } } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/HTableFactory.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/HTableFactory.java index 10a531f198..79607df8e7 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/HTableFactory.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/HTableFactory.java @@ -47,9 +47,11 @@ public interface HTableFactory { */ static class HTableFactoryImpl implements HTableFactory { @Override - public Table getTable(byte[] tableName, Connection connection, ExecutorService pool) throws IOException { - // Let the HBase client manage the thread pool instead of passing ours through - return connection.getTable(TableName.valueOf(tableName)); + public Table getTable(byte[] tableName, Connection connection, ExecutorService pool) + throws IOException { + // If CQSI_THREAD_POOL_ENABLED then we pass ExecutorService created in CQSI to + // HBase Client, else it is null(default), let the HBase client manage the thread pool + return connection.getTable(TableName.valueOf(tableName), pool); } } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index bc9320723c..772eac78c5 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -384,6 +384,16 @@ public interface QueryServices extends SQLCloseable { public static final String PHOENIX_VIEW_TTL_TENANT_VIEWS_PER_SCAN_LIMIT = "phoenix.view.ttl.tenant_views_per_scan.limit"; // Block mutations based on cluster role record public static final String CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = "phoenix.cluster.role.based.mutation.block.enabled"; + //Enable Thread Pool Creation in CQSI to be used for HBase Client. + String CQSI_THREAD_POOL_ENABLED = "phoenix.cqsi.thread.pool.enabled"; + //CQSI Thread Pool Related Configuration. + String CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS = "phoenix.cqsi.thread.pool.keepalive.seconds"; + String CQSI_THREAD_POOL_CORE_POOL_SIZE = "phoenix.cqsi.thread.pool.core.size"; + String CQSI_THREAD_POOL_MAX_THREADS = "phoenix.cqsi.thread.pool.max.threads"; + String CQSI_THREAD_POOL_MAX_QUEUE = "phoenix.cqsi.thread.pool.max.queue"; + // Enables https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html#allowCoreThreadTimeOut-boolean- + String CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT + = "phoenix.cqsi.thread.pool.allow.core.thread.timeout"; // Before 4.15 when we created a view we included the parent table column metadata in the view // metadata. After PHOENIX-3534 we allow SYSTEM.CATALOG to split and no longer store the parent diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 3bd94e4ffa..ca637dbf90 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -75,6 +75,12 @@ import static org.apache.phoenix.query.QueryServices.METRIC_PUBLISHER_ENABLED; import static org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_CORE_POOL_SIZE; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS; import static org.apache.phoenix.query.QueryServices.PHOENIX_ACLS_ENABLED; import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME; import static org.apache.phoenix.query.QueryServices.QUEUE_SIZE_ATTRIB; @@ -454,6 +460,12 @@ public class QueryServicesOptions { public static final Boolean DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = false; + public static final Boolean DEFAULT_CQSI_THREAD_POOL_ENABLED = false; + public static final int DEFAULT_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS = 60; + public static final int DEFAULT_CQSI_THREAD_POOL_CORE_POOL_SIZE = 25; + public static final int DEFAULT_CQSI_THREAD_POOL_MAX_THREADS = 25; + public static final int DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE = 512; + public static final Boolean DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = true; private final Configuration config; @@ -558,8 +570,18 @@ public class QueryServicesOptions { DEFAULT_SERVER_MERGE_FOR_UNCOVERED_INDEX) .setIfUnset(MAX_IN_LIST_SKIP_SCAN_SIZE, DEFAULT_MAX_IN_LIST_SKIP_SCAN_SIZE) .setIfUnset(CONNECTION_ACTIVITY_LOGGING_ENABLED, DEFAULT_CONNECTION_ACTIVITY_LOGGING_ENABLED) - .setIfUnset(CONNECTION_ACTIVITY_LOGGING_INTERVAL, DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS) - .setIfUnset(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED); + .setIfUnset(CONNECTION_ACTIVITY_LOGGING_INTERVAL, + DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS) + .setIfUnset(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, + DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED) + .setIfUnset(CQSI_THREAD_POOL_ENABLED, DEFAULT_CQSI_THREAD_POOL_ENABLED) + .setIfUnset(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, + DEFAULT_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS) + .setIfUnset(CQSI_THREAD_POOL_CORE_POOL_SIZE, DEFAULT_CQSI_THREAD_POOL_CORE_POOL_SIZE) + .setIfUnset(CQSI_THREAD_POOL_MAX_THREADS, DEFAULT_CQSI_THREAD_POOL_MAX_THREADS) + .setIfUnset(CQSI_THREAD_POOL_MAX_QUEUE, DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE) + .setIfUnset(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, + DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT); // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java index ab26000567..73da203c6c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java @@ -25,9 +25,17 @@ import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestin import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithStatement; import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup; +import static org.apache.phoenix.query.BaseTest.extractThreadPoolExecutorFromCQSI; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_CORE_POOL_SIZE; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doAnswer; @@ -48,6 +56,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -113,6 +122,12 @@ public class FailoverPhoenixConnectionIT { haGroupName = testName.getMethodName(); clientProperties = HighAvailabilityTestingUtility.getHATestProperties(); clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName); + clientProperties.setProperty(CQSI_THREAD_POOL_ENABLED, String.valueOf(true)); + clientProperties.setProperty(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, String.valueOf(13)); + clientProperties.setProperty(CQSI_THREAD_POOL_CORE_POOL_SIZE, String.valueOf(17)); + clientProperties.setProperty(CQSI_THREAD_POOL_MAX_THREADS, String.valueOf(19)); + clientProperties.setProperty(CQSI_THREAD_POOL_MAX_QUEUE, String.valueOf(23)); + clientProperties.setProperty(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, String.valueOf(true)); // Make first cluster ACTIVE CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER); @@ -138,6 +153,22 @@ public class FailoverPhoenixConnectionIT { } } + @Test + public void testCQSIThreadPoolCreation() throws Exception { + try (Connection conn = createFailoverConnection()) { + FailoverPhoenixConnection failoverConn = conn.unwrap(FailoverPhoenixConnection.class); + + // verify connection#1 + ConnectionQueryServices cqsi = PhoenixDriver.INSTANCE.getConnectionQueryServices(CLUSTERS.getJdbcUrl1(haGroup), clientProperties); + ConnectionQueryServices cqsiFromConn = failoverConn.getWrappedConnection().getQueryServices(); + // Check that same ThreadPoolExecutor object is used for CQSIs + Assert.assertSame(extractThreadPoolExecutorFromCQSI(cqsi), extractThreadPoolExecutorFromCQSI(cqsiFromConn)); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + /** * Test Phoenix connection creation and basic operations with HBase cluster pair. */ diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java index aa2bf52a9a..7b98a50e3a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java @@ -34,13 +34,22 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALL import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_EXECUTION_TIME; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_QUEUE_WAIT_TIME; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_REJECTED_COUNTER; +import static org.apache.phoenix.query.BaseTest.extractThreadPoolExecutorFromCQSI; import static org.apache.phoenix.query.QueryServices.AUTO_COMMIT_ATTRIB; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_CORE_POOL_SIZE; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; +import java.lang.reflect.Field; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; @@ -49,8 +58,10 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.test.GenericTestUtils; @@ -62,6 +73,8 @@ import org.apache.phoenix.log.LogLevel; import org.apache.phoenix.monitoring.GlobalMetric; import org.apache.phoenix.monitoring.MetricType; import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.ConnectionQueryServicesImpl; +import org.apache.phoenix.query.HBaseFactoryProvider; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.JDBCUtil; import org.apache.phoenix.util.PhoenixRuntime; @@ -107,6 +120,13 @@ public class ParallelPhoenixConnectionIT { GLOBAL_PROPERTIES.setProperty(AUTO_COMMIT_ATTRIB, "true"); GLOBAL_PROPERTIES.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true)); GLOBAL_PROPERTIES.setProperty(QueryServices.LOG_LEVEL, LogLevel.DEBUG.name()); //Need logging for query metrics + GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_ENABLED, String.valueOf(true)); + GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, String.valueOf(13)); + GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_CORE_POOL_SIZE, String.valueOf(17)); + GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_MAX_THREADS, String.valueOf(19)); + GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_MAX_QUEUE, String.valueOf(23)); + GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, String.valueOf(true)); + } @AfterClass @@ -172,6 +192,40 @@ public class ParallelPhoenixConnectionIT { } } + @Test + public void testDifferentCQSIThreadPoolsForParallelConnection() throws Exception { + try (Connection conn = getParallelConnection()) { + ParallelPhoenixConnection pr = conn.unwrap(ParallelPhoenixConnection.class); + PhoenixConnection pConn; + PhoenixConnection pConn2; + if (CLUSTERS.getJdbcUrl1(haGroup).equals(pr.getFutureConnection1().get().getURL())) { + pConn = pr.getFutureConnection1().get(); + pConn2 = pr.getFutureConnection2().get(); + } else { + pConn = pr.getFutureConnection2().get(); + pConn2 = pr.getFutureConnection1().get(); + } + + // verify connection#1 + ConnectionQueryServices cqsi = PhoenixDriver.INSTANCE.getConnectionQueryServices(CLUSTERS.getJdbcUrl1(haGroup), clientProperties); + ConnectionQueryServices cqsiFromConn = pConn.getQueryServices(); + // Check that same ThreadPoolExecutor object is used for CQSIs + ThreadPoolExecutor threadPoolExecutor1 = extractThreadPoolExecutorFromCQSI(cqsi); + Assert.assertSame(threadPoolExecutor1, extractThreadPoolExecutorFromCQSI(cqsiFromConn)); + + // verify connection#2 + cqsi = PhoenixDriver.INSTANCE.getConnectionQueryServices(CLUSTERS.getJdbcUrl2(haGroup), clientProperties); + cqsiFromConn = pConn2.getQueryServices(); + Assert.assertSame(cqsi, cqsiFromConn); + // Check that same ThreadPoolExecutor object is used for CQSIs + ThreadPoolExecutor threadPoolExecutor2 = extractThreadPoolExecutorFromCQSI(cqsi); + Assert.assertSame(extractThreadPoolExecutorFromCQSI(cqsi), extractThreadPoolExecutorFromCQSI(cqsiFromConn)); + + // Check that both threadPools for parallel connections are different. + assertNotSame(threadPoolExecutor1, threadPoolExecutor2); + } + } + /** * Test Phoenix connection creation and basic operations with HBase cluster(s) unavailable. */ diff --git a/phoenix-core/src/it/java/org/apache/phoenix/query/ConnectionQueryServicesImplThreadPoolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/query/ConnectionQueryServicesImplThreadPoolIT.java new file mode 100644 index 0000000000..f2edfcb9cd --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/query/ConnectionQueryServicesImplThreadPoolIT.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.query; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.ConnectionImplementation; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Table; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl; +import org.apache.phoenix.jdbc.ConnectionInfo; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.util.InstanceResolver; +import org.apache.phoenix.util.PhoenixRuntime; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_CORE_POOL_SIZE; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +@Category(NeedsOwnMiniClusterTest.class) +public class ConnectionQueryServicesImplThreadPoolIT extends BaseTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(ConnectionQueryServicesImplThreadPoolIT.class); + private AtomicInteger counter = new AtomicInteger(); + private static HBaseTestingUtility hbaseTestUtil; + private String tableName; + private static final String CONN_QUERY_SERVICE_CREATE_TABLE = "CONN_QUERY_SERVICE_CREATE_TABLE"; + private static final String CONN_QUERY_SERVICE_1 = "CONN_QUERY_SERVICE_1"; + private static final String CONN_QUERY_SERVICE_2 = "CONN_QUERY_SERVICE_2"; + private static final int TEST_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS = 13; + private static final int TEST_CQSI_THREAD_POOL_CORE_POOL_SIZE = 17; + private static final int TEST_CQSI_THREAD_POOL_MAX_THREADS = 19; + private static final int TEST_CQSI_THREAD_POOL_MAX_QUEUE = 23; + + + + @BeforeClass + public static void doSetup() throws Exception { + InstanceResolver.clearSingletons(); + InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() { + @Override public Configuration getConfiguration() { + Configuration conf = HBaseConfiguration.create(); + conf.set(CQSI_THREAD_POOL_ENABLED, Boolean.toString(true)); + conf.set(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, Integer.toString(TEST_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS)); + conf.set(CQSI_THREAD_POOL_CORE_POOL_SIZE, Integer.toString(TEST_CQSI_THREAD_POOL_CORE_POOL_SIZE)); + conf.set(CQSI_THREAD_POOL_MAX_THREADS, Integer.toString(TEST_CQSI_THREAD_POOL_MAX_THREADS)); + conf.set(CQSI_THREAD_POOL_MAX_QUEUE, Integer.toString(TEST_CQSI_THREAD_POOL_MAX_QUEUE)); + conf.set(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, Boolean.toString(true)); + return conf; + } + + @Override public Configuration getConfiguration(Configuration confToClone) { + Configuration conf = HBaseConfiguration.create(); + conf.set(CQSI_THREAD_POOL_ENABLED, Boolean.toString(true)); + conf.set(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, Integer.toString(TEST_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS)); + conf.set(CQSI_THREAD_POOL_CORE_POOL_SIZE, Integer.toString(TEST_CQSI_THREAD_POOL_CORE_POOL_SIZE)); + conf.set(CQSI_THREAD_POOL_MAX_THREADS, Integer.toString(TEST_CQSI_THREAD_POOL_MAX_THREADS)); + conf.set(CQSI_THREAD_POOL_MAX_QUEUE, Integer.toString(TEST_CQSI_THREAD_POOL_MAX_QUEUE)); + conf.set(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, Boolean.toString(true)); + Configuration copy = new Configuration(conf); + copy.addResource(confToClone); + return copy; + } + }); + Configuration conf = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); + hbaseTestUtil = new HBaseTestingUtility(conf); + setUpConfigForMiniCluster(conf); + hbaseTestUtil.startMiniCluster(); + String zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort(); + url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum; + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @AfterClass + public static void tearDownMiniCluster() { + try { + if (hbaseTestUtil != null) { + hbaseTestUtil.shutdownMiniCluster(); + } + } catch (Exception e) { + // ignore + } finally { + ServerMetadataCacheTestImpl.resetCache(); + } + } + + @Before + public void setUp() throws Exception { + tableName = generateUniqueName(); + createTable(tableName); + } + + private String connUrlWithPrincipal(String principalName) throws SQLException { + return ConnectionInfo.create(url, null, null).withPrincipal(principalName).toUrl(); + } + + @Test + public void checkHTableThreadPoolExecutorSame() throws Exception { + Table table = createCQSI(null).getTable(tableName.getBytes()); + assertTrue(table instanceof HTable); + HTable hTable = (HTable) table; + Field props = hTable.getClass().getDeclaredField("pool"); + props.setAccessible(true); + validateThreadPoolExecutor((ThreadPoolExecutor) props.get(hTable)); + } + + @Test + public void checkHConnectionThreadPoolExecutorSame() throws Exception { + // Extract Conn1 instance from CQSI1 + ConnectionImplementation conn1 = extractConnectionFromCQSI(createCQSI("hello")); + // Extract batchPool from connection in CQSI1 + ThreadPoolExecutor threadPoolExecutor1FromConnection = extractBatchPool(conn1); + // Create another CQSI2 + ConnectionQueryServices connQueryServices2 = createCQSI("bye"); + // Extract the ThreadPoolExecutor from CQSI2 instance + ThreadPoolExecutor threadPoolExecutor2 = extractThreadPoolExecutorFromCQSI(connQueryServices2); + // Extract Conn2 from CQSI2 + ConnectionImplementation conn2 = extractConnectionFromCQSI(createCQSI("bye")); + // Extract batchPool from connection2 in CQSI2 + ThreadPoolExecutor threadPoolExecutor2FromConnection = extractBatchPool(conn2); + // Check if ThreadPoolExecutor2 from CQSI and from Connection are Same + assertSame(threadPoolExecutor2, threadPoolExecutor2FromConnection); + // Check if threadPoolExecutor from connection1 and from conn2 are different + assertNotSame(threadPoolExecutor1FromConnection, threadPoolExecutor2FromConnection); + + // Validate the properties for ThreadPoolExecutors + validateThreadPoolExecutor(threadPoolExecutor1FromConnection); + validateThreadPoolExecutor(threadPoolExecutor2FromConnection); + validateThreadPoolExecutor(threadPoolExecutor2); + } + + private static ThreadPoolExecutor extractBatchPool(ConnectionImplementation conn) throws NoSuchFieldException, IllegalAccessException { + Field batchPoolField = conn.getClass().getDeclaredField("batchPool"); + batchPoolField.setAccessible(true); + return (ThreadPoolExecutor) batchPoolField.get(conn); + } + + @Test + public void testMultipleCQSIThreadPoolsInParallel() throws Exception { + ConnectionQueryServices cqsiExternal1 = createCQSI(CONN_QUERY_SERVICE_1); + ConnectionQueryServices cqsiExternal2 = createCQSI(CONN_QUERY_SERVICE_2); + Thread cqsiThread1 = new Thread(() -> { + try { + ConnectionQueryServices cqsi = createCQSI(CONN_QUERY_SERVICE_1); + checkSameThreadPool(cqsiExternal1, cqsi); + checkDifferentThreadPool(cqsiExternal2, cqsi); + validateThreadPoolExecutor(extractThreadPoolExecutorFromCQSI(cqsi)); + counter.incrementAndGet(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + Thread cqsiThread2 = new Thread(() -> { + try { + ConnectionQueryServices cqsi = createCQSI(CONN_QUERY_SERVICE_1); + checkSameThreadPool(cqsiExternal1, cqsi); + checkDifferentThreadPool(cqsiExternal2, cqsi); + validateThreadPoolExecutor(extractThreadPoolExecutorFromCQSI(cqsi)); + counter.incrementAndGet(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + Thread cqsiThread3 = new Thread(() -> { + try { + ConnectionQueryServices cqsi = createCQSI(CONN_QUERY_SERVICE_2); + checkSameThreadPool(cqsiExternal2, cqsi); + checkDifferentThreadPool(cqsiExternal1, cqsi); + validateThreadPoolExecutor(extractThreadPoolExecutorFromCQSI(cqsi)); + counter.incrementAndGet(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + Thread cqsiThread4 = new Thread(() -> { + try { + ConnectionQueryServices cqsi = createCQSI(CONN_QUERY_SERVICE_2); + checkSameThreadPool(cqsiExternal2, cqsi); + checkDifferentThreadPool(cqsiExternal1, cqsi); + validateThreadPoolExecutor(extractThreadPoolExecutorFromCQSI(cqsi)); + counter.incrementAndGet(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + + cqsiThread1.start(); + cqsiThread2.start(); + cqsiThread3.start(); + cqsiThread4.start(); + cqsiThread1.join(); + cqsiThread2.join(); + cqsiThread3.join(); + cqsiThread4.join(); + + assertEquals(4, counter.get()); + } + + private void checkSameThreadPool(ConnectionQueryServices cqsi1, ConnectionQueryServices cqsi2) throws NoSuchFieldException, IllegalAccessException { + assertSame(extractThreadPoolExecutorFromCQSI(cqsi1), extractThreadPoolExecutorFromCQSI(cqsi2)); + } + + private void checkDifferentThreadPool(ConnectionQueryServices cqsi1, ConnectionQueryServices cqsi2) throws NoSuchFieldException, IllegalAccessException { + assertNotSame(extractThreadPoolExecutorFromCQSI(cqsi1), extractThreadPoolExecutorFromCQSI(cqsi2)); + } + + private ConnectionQueryServices createCQSI(String serviceName) throws SQLException { + String principalURL = connUrlWithPrincipal(serviceName); + Connection conn = DriverManager.getConnection(principalURL); + return conn.unwrap(PhoenixConnection.class).getQueryServices(); + } + + private void validateThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) { + assertEquals(TEST_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS)); + assertEquals(TEST_CQSI_THREAD_POOL_CORE_POOL_SIZE, threadPoolExecutor.getCorePoolSize()); + assertEquals(TEST_CQSI_THREAD_POOL_MAX_THREADS, threadPoolExecutor.getMaximumPoolSize()); + assertEquals(TEST_CQSI_THREAD_POOL_MAX_QUEUE, threadPoolExecutor.getQueue().remainingCapacity()); + } + + + private void createTable(String tableName) throws SQLException { + String CREATE_TABLE_DDL = "CREATE TABLE IF NOT EXISTS %s (K VARCHAR(10) NOT NULL" + + " PRIMARY KEY, V VARCHAR)"; + String princURL = connUrlWithPrincipal(CONN_QUERY_SERVICE_CREATE_TABLE); + LOGGER.info("Connection Query Service : " + CONN_QUERY_SERVICE_CREATE_TABLE + " URL : " + princURL); + try (Connection conn = DriverManager.getConnection(princURL); + Statement stmt = conn.createStatement()) { + stmt.execute(String.format(CREATE_TABLE_DDL, tableName)); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + + private ConnectionImplementation extractConnectionFromCQSI(ConnectionQueryServices cqsi) throws NoSuchFieldException, IllegalAccessException { + Field connectionField1 = cqsi.getClass().getDeclaredField("connection"); + connectionField1.setAccessible(true); + return (ConnectionImplementation) connectionField1.get(cqsi); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index a7fcfd2cb3..a888a8ac1a 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -83,6 +83,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.lang.reflect.Constructor; +import java.lang.reflect.Field; import java.math.BigDecimal; import java.sql.Connection; import java.sql.DatabaseMetaData; @@ -110,6 +111,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -2188,4 +2190,10 @@ public abstract class BaseTest { } return false; } + + public static ThreadPoolExecutor extractThreadPoolExecutorFromCQSI(final ConnectionQueryServices cqs) throws NoSuchFieldException, IllegalAccessException { + Field props = cqs.getClass().getDeclaredField("threadPoolExecutor"); + props.setAccessible(true); + return (ThreadPoolExecutor) props.get(cqs); + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java index 048bf250aa..f25e7a04ed 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java @@ -21,13 +21,21 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_FOR_MUTEX; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_CORE_POOL_SIZE; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; @@ -42,6 +50,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; @@ -58,6 +70,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.phoenix.SystemExitRule; import org.apache.phoenix.exception.PhoenixIOException; +import org.apache.phoenix.jdbc.ConnectionInfo; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.monitoring.GlobalClientMetrics; import org.apache.phoenix.util.ReadOnlyProps; @@ -134,6 +147,38 @@ public class ConnectionQueryServicesImplTest { doCallRealMethod().when(mockCqs).dropTables(Mockito.any()); } + @Test + public void testCQSIThreadPoolCreation() throws SQLException, NoSuchFieldException, IllegalAccessException { + QueryServices mockQueryServices = Mockito.mock(QueryServices.class); + ReadOnlyProps readOnlyProps = createCQSIThreadPoolReadOnlyProps(); + when(mockQueryServices.getProps()).thenReturn(readOnlyProps); + ConnectionInfo mockConnectionInfo = Mockito.mock(ConnectionInfo.class); + when(mockConnectionInfo.asProps()).thenReturn(readOnlyProps); + Properties properties = new Properties(); + ConnectionQueryServicesImpl cqs = new ConnectionQueryServicesImpl(mockQueryServices, mockConnectionInfo, properties); + Field props = cqs.getClass().getDeclaredField("threadPoolExecutor"); + props.setAccessible(true); + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) props.get(cqs); + assertNotNull(threadPoolExecutor); + assertEquals(readOnlyProps.getInt(CQSI_THREAD_POOL_CORE_POOL_SIZE, -1), threadPoolExecutor.getCorePoolSize()); + assertEquals(readOnlyProps.getInt(CQSI_THREAD_POOL_MAX_THREADS,-1), threadPoolExecutor.getMaximumPoolSize()); + assertEquals(LinkedBlockingQueue.class, threadPoolExecutor.getQueue().getClass()); + assertEquals(readOnlyProps.getInt(CQSI_THREAD_POOL_MAX_QUEUE, -1), threadPoolExecutor.getQueue().remainingCapacity()); + assertEquals(readOnlyProps.getInt(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, -1), threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS)); + assertTrue(threadPoolExecutor.allowsCoreThreadTimeOut()); + } + + private static ReadOnlyProps createCQSIThreadPoolReadOnlyProps() { + Map<String, String> props = new HashMap<>(); + props.put(CQSI_THREAD_POOL_ENABLED, Boolean.toString(true)); + props.put(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, Integer.toString(13)); + props.put(CQSI_THREAD_POOL_CORE_POOL_SIZE, Integer.toString(17)); + props.put(CQSI_THREAD_POOL_MAX_THREADS, Integer.toString(19)); + props.put(CQSI_THREAD_POOL_MAX_QUEUE, Integer.toString(23)); + props.put(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, Boolean.toString(true)); + return new ReadOnlyProps(props); + } + @SuppressWarnings("unchecked") @Test public void testExceptionHandlingOnSystemNamespaceCreation() throws Exception { @@ -330,26 +375,26 @@ public class ConnectionQueryServicesImplTest { public void testGetSysMutexTableWithName() throws Exception { when(mockAdmin.tableExists(any())).thenReturn(true); when(mockConn.getAdmin()).thenReturn(mockAdmin); - when(mockConn.getTable(TableName.valueOf("SYSTEM.MUTEX"))) + when(mockConn.getTable(eq(TableName.valueOf("SYSTEM.MUTEX")), any())) .thenReturn(mockTable); assertSame(mockCqs.getSysMutexTable(), mockTable); verify(mockAdmin, Mockito.times(1)).tableExists(any()); verify(mockConn, Mockito.times(1)).getAdmin(); verify(mockConn, Mockito.times(1)) - .getTable(TableName.valueOf("SYSTEM.MUTEX")); + .getTable(eq(TableName.valueOf("SYSTEM.MUTEX")), any()); } @Test public void testGetSysMutexTableWithNamespace() throws Exception { when(mockAdmin.tableExists(any())).thenReturn(false); when(mockConn.getAdmin()).thenReturn(mockAdmin); - when(mockConn.getTable(TableName.valueOf("SYSTEM:MUTEX"))) + when(mockConn.getTable(eq(TableName.valueOf("SYSTEM:MUTEX")), any())) .thenReturn(mockTable); assertSame(mockCqs.getSysMutexTable(), mockTable); verify(mockAdmin, Mockito.times(1)).tableExists(any()); verify(mockConn, Mockito.times(1)).getAdmin(); verify(mockConn, Mockito.times(1)) - .getTable(TableName.valueOf("SYSTEM:MUTEX")); + .getTable(eq(TableName.valueOf("SYSTEM:MUTEX")), any()); } @Test