This is an automated email from the ASF dual-hosted git repository.

jisaac pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 20e3bded8a PHOENIX-7605 Adding ability to configure threadpool at CQSI 
level (#2138)
20e3bded8a is described below

commit 20e3bded8adabbb501d41a7d7cb8d7782e7cba0e
Author: ritegarg <58840065+riteg...@users.noreply.github.com>
AuthorDate: Wed May 14 10:46:35 2025 -0700

    PHOENIX-7605 Adding ability to configure threadpool at CQSI level (#2138)
    
    Co-authored-by: Ritesh Garg 
<ritesh.g...@riteshg-ltmd34g.internal.salesforce.com>
---
 .../phoenix/query/ConnectionQueryServicesImpl.java | 118 +++++++--
 .../apache/phoenix/query/HConnectionFactory.java   |  19 ++
 .../org/apache/phoenix/query/HTableFactory.java    |   8 +-
 .../org/apache/phoenix/query/QueryServices.java    |  10 +
 .../apache/phoenix/query/QueryServicesOptions.java |  26 +-
 .../phoenix/jdbc/FailoverPhoenixConnectionIT.java  |  31 +++
 .../phoenix/jdbc/ParallelPhoenixConnectionIT.java  |  54 ++++
 .../ConnectionQueryServicesImplThreadPoolIT.java   | 285 +++++++++++++++++++++
 .../java/org/apache/phoenix/query/BaseTest.java    |   8 +
 .../query/ConnectionQueryServicesImplTest.java     |  53 +++-
 10 files changed, 583 insertions(+), 29 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 762eccf054..a5998750dd 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -81,11 +81,17 @@ import static 
org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_VIEW_TTL_ENABLED;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_ENABLED;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_MAX_THREADS;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_CORE_POOL_SIZE;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_QUERY_SERVICES_NAME;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS;
-import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SYSTEM_CATALOG_INDEXES_ENABLED;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_TIMEOUT_DURING_UPGRADE_MS;
 import static org.apache.phoenix.util.UpgradeUtil.addParentToChildLinks;
 import static org.apache.phoenix.util.UpgradeUtil.addViewIndexToParentLinks;
@@ -118,6 +124,7 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -132,6 +139,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -185,6 +193,7 @@ import 
org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.ipc.RemoteException;
@@ -417,6 +426,8 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     private Connection invalidateMetadataCacheConnection = null;
     private final Object invalidateMetadataCacheConnLock = new Object();
     private MetricsMetadataCachingSource metricsMetadataCachingSource;
+    private ThreadPoolExecutor threadPoolExecutor = null;
+    private static final AtomicInteger threadPoolNumber = new AtomicInteger(1);
     public static final String INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE =
             "Cannot invalidate server metadata cache on a non-server 
connection";
 
@@ -451,9 +462,10 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     /**
      * Construct a ConnectionQueryServicesImpl that represents a connection to 
an HBase
      * cluster.
-     * @param services base services from where we derive our default 
configuration
+     *
+     * @param services       base services from where we derive our default 
configuration
      * @param connectionInfo to provide connection information
-     * @param info hbase configuration properties
+     * @param info           hbase configuration properties
      */
     public ConnectionQueryServicesImpl(QueryServices services, ConnectionInfo 
connectionInfo, Properties info) {
         super(services);
@@ -477,23 +489,73 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         this.connectionInfo = connectionInfo;
 
         // Without making a copy of the configuration we cons up, we lose some 
of our properties
-        // on the server side during testing.
-        this.config = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config);
+        // on the server side during testing. This allows the application 
overridden
+        // ConfigurationFactory to inject/modify configs
+        Configuration finalConfig = HBaseFactoryProvider
+                                        
.getConfigurationFactory().getConfiguration(config);
+        this.config = finalConfig;
+
+        if (finalConfig.getBoolean(CQSI_THREAD_POOL_ENABLED, 
DEFAULT_CQSI_THREAD_POOL_ENABLED)) {
+            final int keepAlive = 
finalConfig.getInt(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS,
+                    DEFAULT_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS);
+            final int corePoolSize = 
finalConfig.getInt(CQSI_THREAD_POOL_CORE_POOL_SIZE,
+                    DEFAULT_CQSI_THREAD_POOL_CORE_POOL_SIZE);
+            final int maxThreads = 
finalConfig.getInt(CQSI_THREAD_POOL_MAX_THREADS,
+                    DEFAULT_CQSI_THREAD_POOL_MAX_THREADS);
+            final int maxQueue = finalConfig.getInt(CQSI_THREAD_POOL_MAX_QUEUE,
+                    DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE);
+            final String threadPoolName = connectionInfo.getPrincipal() != null
+                    ? connectionInfo.getPrincipal()
+                    : DEFAULT_QUERY_SERVICES_NAME;
+            // Based on implementations used in
+            // org.apache.hadoop.hbase.client.ConnectionImplementation
+            final BlockingQueue<Runnable> workQueue = new 
LinkedBlockingQueue<>(maxQueue);
+            this.threadPoolExecutor =
+                    new ThreadPoolExecutor(corePoolSize, maxThreads, 
keepAlive, TimeUnit.SECONDS,
+                            workQueue, new ThreadFactoryBuilder()
+                                        .setDaemon(true)
+                                        .setNameFormat("CQSI-" + threadPoolName
+                                                + "-" + 
threadPoolNumber.incrementAndGet()
+                                                + "-shared-pool-%d")
+                                        .setUncaughtExceptionHandler(
+                                                
Threads.LOGGING_EXCEPTION_HANDLER)
+                                        .build());
+            this.threadPoolExecutor.allowCoreThreadTimeOut(finalConfig
+                    .getBoolean(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
+                    DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT));
+            LOGGER.info("For ConnectionQueryService = {} , " +
+                            "CQSI ThreadPool Configs {} = {}, {} = {}, {} = 
{}, {} = {}, {} = {}",
+                    threadPoolName,
+                    CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS,
+                    finalConfig.get(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS),
+                    CQSI_THREAD_POOL_CORE_POOL_SIZE,
+                    finalConfig.get(CQSI_THREAD_POOL_CORE_POOL_SIZE),
+                    CQSI_THREAD_POOL_MAX_THREADS,
+                    finalConfig.get(CQSI_THREAD_POOL_MAX_THREADS),
+                    CQSI_THREAD_POOL_MAX_QUEUE,
+                    finalConfig.get(CQSI_THREAD_POOL_MAX_QUEUE),
+                    CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
+                    
finalConfig.get(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT));
+        }
+
+
 
         LOGGER.info(
-                "CQS Configs {} = {} , {} = {} , {} = {} , {} = {} , {} = {} , 
{} = {} , {} = {}",
+                "CQS Configs {} = {} , {} = {} , {} = {} , {} = {} , {} = {} , 
{} = {} , {} = {}, {} = {}",
                 HConstants.ZOOKEEPER_QUORUM,
-                this.config.get(HConstants.ZOOKEEPER_QUORUM), 
HConstants.CLIENT_ZOOKEEPER_QUORUM,
-                this.config.get(HConstants.CLIENT_ZOOKEEPER_QUORUM),
+                finalConfig.get(HConstants.ZOOKEEPER_QUORUM), 
HConstants.CLIENT_ZOOKEEPER_QUORUM,
+                finalConfig.get(HConstants.CLIENT_ZOOKEEPER_QUORUM),
                 HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT,
-                this.config.get(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT),
+                finalConfig.get(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT),
                 HConstants.ZOOKEEPER_CLIENT_PORT,
-                this.config.get(HConstants.ZOOKEEPER_CLIENT_PORT),
+                finalConfig.get(HConstants.ZOOKEEPER_CLIENT_PORT),
                 RPCConnectionInfo.BOOTSTRAP_NODES,
-                this.config.get(RPCConnectionInfo.BOOTSTRAP_NODES),
-                HConstants.MASTER_ADDRS_KEY, 
this.config.get(HConstants.MASTER_ADDRS_KEY),
+                finalConfig.get(RPCConnectionInfo.BOOTSTRAP_NODES),
+                HConstants.MASTER_ADDRS_KEY, 
finalConfig.get(HConstants.MASTER_ADDRS_KEY),
                 ConnectionInfo.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
-                
this.config.get(ConnectionInfo.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY));
+                
finalConfig.get(ConnectionInfo.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY),
+                QueryServices.CQSI_THREAD_POOL_ENABLED,
+                finalConfig.get(QueryServices.CQSI_THREAD_POOL_ENABLED));
 
         //Set the rpcControllerFactory if it is a server side connnection.
         boolean isServerSideConnection = 
config.getBoolean(QueryUtil.IS_SERVER_CONNECTION, false);
@@ -501,8 +563,8 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             this.serverSideRPCControllerFactory = new 
ServerSideRPCControllerFactory(config);
         }
         // set replication required parameter
-        ConfigUtil.setReplicationConfigIfAbsent(this.config);
-        this.props = new ReadOnlyProps(this.config.iterator());
+        ConfigUtil.setReplicationConfigIfAbsent(finalConfig);
+        this.props = new ReadOnlyProps(finalConfig.iterator());
         this.userName = connectionInfo.getPrincipal();
         this.user = connectionInfo.getUser();
         this.latestMetaData = newEmptyMetaData();
@@ -557,17 +619,17 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                     .build();
         }
 
-        if (this.config.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
+        if (finalConfig.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
             // "hbase.client.metrics.scope" defined on
             // 
org.apache.hadoop.hbase.client.MetricsConnection#METRICS_SCOPE_KEY
             // however we cannot use the constant directly as long as we 
support HBase 2.4 profile.
-            this.config.set("hbase.client.metrics.scope", 
config.get(QUERY_SERVICES_NAME));
+            finalConfig.set("hbase.client.metrics.scope", 
config.get(QUERY_SERVICES_NAME));
         }
 
         if (!QueryUtil.isServerConnection(props)) {
             //Start queryDistruptor everytime as log level can be change at 
connection level as well, but we can avoid starting for server connections.
             try {
-                this.queryDisruptor = new QueryLoggerDisruptor(this.config);
+                this.queryDisruptor = new QueryLoggerDisruptor(finalConfig);
             } catch (SQLException e) {
                 LOGGER.warn("Unable to initiate query logging service !!");
                 e.printStackTrace();
@@ -582,7 +644,8 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     private Connection openConnection(Configuration conf) throws SQLException {
         Connection localConnection;
         try {
-            localConnection = 
HBaseFactoryProvider.getHConnectionFactory().createConnection(conf);
+            localConnection = HBaseFactoryProvider.getHConnectionFactory()
+                    .createConnection(conf, threadPoolExecutor);
             GLOBAL_HCONNECTIONS_COUNTER.increment();
             LOGGER.info("HConnection established. Stacktrace for informational 
purposes: "
                     + localConnection + " " +  LogUtil.getCallerStackTrace());
@@ -634,7 +697,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     public Table getTable(byte[] tableName) throws SQLException {
         try {
             return HBaseFactoryProvider.getHTableFactory().getTable(tableName,
-                connection, null);
+                connection, threadPoolExecutor);
         } catch (IOException e) {
             throw new SQLException(e);
         }
@@ -760,6 +823,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                     try {
                         tableStatsCache.invalidateAll();
                         super.close();
+                        shutdownThreadPool(this.threadPoolExecutor);
                     } catch (SQLException e) {
                         if (sqlE == null) {
                             sqlE = e;
@@ -774,6 +838,20 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
     }
 
+    // Based on org.apache.hadoop.hbase.client.ConnectionImplementation
+    private void shutdownThreadPool(ThreadPoolExecutor pool) {
+        if (pool != null && !pool.isShutdown()) {
+            pool.shutdown();
+            try {
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                    pool.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                pool.shutdownNow();
+            }
+        }
+    }
+
     protected ConnectionQueryServices newChildQueryService() {
         return new ChildQueryServices(this);
     }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/HConnectionFactory.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/HConnectionFactory.java
index c3e0eb54dc..b296b35892 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/HConnectionFactory.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/HConnectionFactory.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.query;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Connection;
@@ -38,6 +39,18 @@ public interface HConnectionFactory {
      */
     Connection createConnection(Configuration conf) throws IOException;
 
+    /**
+     * Creates HConnection to access HBase clusters.
+     *
+     * @param conf object
+     * @param pool object
+     * @return A HConnection instance
+     */
+    default Connection createConnection(Configuration conf, ExecutorService 
pool)
+            throws IOException {
+        return createConnection(conf);
+    }
+
     /**
      * Default implementation.  Uses standard HBase HConnections.
      */
@@ -46,5 +59,11 @@ public interface HConnectionFactory {
         public Connection createConnection(Configuration conf) throws 
IOException {
             return ConnectionFactory.createConnection(conf);
         }
+
+        @Override
+        public Connection createConnection(Configuration conf, ExecutorService 
pool)
+                throws IOException {
+            return ConnectionFactory.createConnection(conf, pool);
+        }
     }
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/HTableFactory.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/HTableFactory.java
index 10a531f198..79607df8e7 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/HTableFactory.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/HTableFactory.java
@@ -47,9 +47,11 @@ public interface HTableFactory {
      */
     static class HTableFactoryImpl implements HTableFactory {
         @Override
-        public Table getTable(byte[] tableName, Connection connection, 
ExecutorService pool) throws IOException {
-            // Let the HBase client manage the thread pool instead of passing 
ours through
-            return connection.getTable(TableName.valueOf(tableName));
+        public Table getTable(byte[] tableName, Connection connection, 
ExecutorService pool)
+                throws IOException {
+            // If CQSI_THREAD_POOL_ENABLED then we pass ExecutorService 
created in CQSI to
+            // HBase Client, else it is null(default), let the HBase client 
manage the thread pool
+            return connection.getTable(TableName.valueOf(tableName), pool);
         }
     }
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index bc9320723c..772eac78c5 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -384,6 +384,16 @@ public interface QueryServices extends SQLCloseable {
     public static final String PHOENIX_VIEW_TTL_TENANT_VIEWS_PER_SCAN_LIMIT = 
"phoenix.view.ttl.tenant_views_per_scan.limit";
     // Block mutations based on cluster role record
     public static final String CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = 
"phoenix.cluster.role.based.mutation.block.enabled";
+    //Enable Thread Pool Creation in CQSI to be used for HBase Client.
+    String CQSI_THREAD_POOL_ENABLED = "phoenix.cqsi.thread.pool.enabled";
+    //CQSI Thread Pool Related Configuration.
+    String CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS = 
"phoenix.cqsi.thread.pool.keepalive.seconds";
+    String CQSI_THREAD_POOL_CORE_POOL_SIZE = 
"phoenix.cqsi.thread.pool.core.size";
+    String CQSI_THREAD_POOL_MAX_THREADS = 
"phoenix.cqsi.thread.pool.max.threads";
+    String CQSI_THREAD_POOL_MAX_QUEUE = "phoenix.cqsi.thread.pool.max.queue";
+    // Enables 
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html#allowCoreThreadTimeOut-boolean-
+    String CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT
+            = "phoenix.cqsi.thread.pool.allow.core.thread.timeout";
 
     // Before 4.15 when we created a view we included the parent table column 
metadata in the view
     // metadata. After PHOENIX-3534 we allow SYSTEM.CATALOG to split and no 
longer store the parent
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 3bd94e4ffa..ca637dbf90 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -75,6 +75,12 @@ import static 
org.apache.phoenix.query.QueryServices.METRIC_PUBLISHER_ENABLED;
 import static 
org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK;
+import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_CORE_POOL_SIZE;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS;
 import static org.apache.phoenix.query.QueryServices.PHOENIX_ACLS_ENABLED;
 import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME;
 import static org.apache.phoenix.query.QueryServices.QUEUE_SIZE_ATTRIB;
@@ -454,6 +460,12 @@ public class QueryServicesOptions {
 
 
     public static final Boolean 
DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = false;
+    public static final Boolean DEFAULT_CQSI_THREAD_POOL_ENABLED = false;
+    public static final int DEFAULT_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS = 60;
+    public static final int DEFAULT_CQSI_THREAD_POOL_CORE_POOL_SIZE = 25;
+    public static final int DEFAULT_CQSI_THREAD_POOL_MAX_THREADS = 25;
+    public static final int DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE = 512;
+    public static final Boolean 
DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = true;
 
 
     private final Configuration config;
@@ -558,8 +570,18 @@ public class QueryServicesOptions {
                 DEFAULT_SERVER_MERGE_FOR_UNCOVERED_INDEX)
             .setIfUnset(MAX_IN_LIST_SKIP_SCAN_SIZE, 
DEFAULT_MAX_IN_LIST_SKIP_SCAN_SIZE)
             .setIfUnset(CONNECTION_ACTIVITY_LOGGING_ENABLED, 
DEFAULT_CONNECTION_ACTIVITY_LOGGING_ENABLED)
-            .setIfUnset(CONNECTION_ACTIVITY_LOGGING_INTERVAL, 
DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS)
-            .setIfUnset(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, 
DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED);
+            .setIfUnset(CONNECTION_ACTIVITY_LOGGING_INTERVAL,
+                    DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS)
+            .setIfUnset(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED,
+                    DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED)
+            .setIfUnset(CQSI_THREAD_POOL_ENABLED, 
DEFAULT_CQSI_THREAD_POOL_ENABLED)
+            .setIfUnset(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS,
+                    DEFAULT_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS)
+            .setIfUnset(CQSI_THREAD_POOL_CORE_POOL_SIZE, 
DEFAULT_CQSI_THREAD_POOL_CORE_POOL_SIZE)
+            .setIfUnset(CQSI_THREAD_POOL_MAX_THREADS, 
DEFAULT_CQSI_THREAD_POOL_MAX_THREADS)
+            .setIfUnset(CQSI_THREAD_POOL_MAX_QUEUE, 
DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE)
+            .setIfUnset(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
+                    DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT);
 
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user 
set
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
index ab26000567..73da203c6c 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java
@@ -25,9 +25,17 @@ import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestin
 import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR;
 import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithStatement;
 import static 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup;
+import static 
org.apache.phoenix.query.BaseTest.extractThreadPoolExecutorFromCQSI;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_CORE_POOL_SIZE;
+import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doAnswer;
@@ -48,6 +56,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -113,6 +122,12 @@ public class FailoverPhoenixConnectionIT {
         haGroupName = testName.getMethodName();
         clientProperties = 
HighAvailabilityTestingUtility.getHATestProperties();
         clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName);
+        clientProperties.setProperty(CQSI_THREAD_POOL_ENABLED, 
String.valueOf(true));
+        clientProperties.setProperty(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, 
String.valueOf(13));
+        clientProperties.setProperty(CQSI_THREAD_POOL_CORE_POOL_SIZE, 
String.valueOf(17));
+        clientProperties.setProperty(CQSI_THREAD_POOL_MAX_THREADS, 
String.valueOf(19));
+        clientProperties.setProperty(CQSI_THREAD_POOL_MAX_QUEUE, 
String.valueOf(23));
+        
clientProperties.setProperty(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, 
String.valueOf(true));
 
         // Make first cluster ACTIVE
         CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER);
@@ -138,6 +153,22 @@ public class FailoverPhoenixConnectionIT {
         }
     }
 
+    @Test
+    public void testCQSIThreadPoolCreation() throws Exception {
+        try (Connection conn = createFailoverConnection()) {
+            FailoverPhoenixConnection failoverConn = 
conn.unwrap(FailoverPhoenixConnection.class);
+
+            // verify connection#1
+            ConnectionQueryServices cqsi = 
PhoenixDriver.INSTANCE.getConnectionQueryServices(CLUSTERS.getJdbcUrl1(haGroup),
 clientProperties);
+            ConnectionQueryServices cqsiFromConn = 
failoverConn.getWrappedConnection().getQueryServices();
+            // Check that same ThreadPoolExecutor object is used for CQSIs
+            Assert.assertSame(extractThreadPoolExecutorFromCQSI(cqsi), 
extractThreadPoolExecutorFromCQSI(cqsiFromConn));
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
     /**
      * Test Phoenix connection creation and basic operations with HBase 
cluster pair.
      */
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
index aa2bf52a9a..7b98a50e3a 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
@@ -34,13 +34,22 @@ import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALL
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_EXECUTION_TIME;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_QUEUE_WAIT_TIME;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_PARALLEL_POOL2_TASK_REJECTED_COUNTER;
+import static 
org.apache.phoenix.query.BaseTest.extractThreadPoolExecutorFromCQSI;
 import static org.apache.phoenix.query.QueryServices.AUTO_COMMIT_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_CORE_POOL_SIZE;
+import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 
+import java.lang.reflect.Field;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
@@ -49,8 +58,10 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -62,6 +73,8 @@ import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.monitoring.GlobalMetric;
 import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl;
+import org.apache.phoenix.query.HBaseFactoryProvider;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.JDBCUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -107,6 +120,13 @@ public class ParallelPhoenixConnectionIT {
         GLOBAL_PROPERTIES.setProperty(AUTO_COMMIT_ATTRIB, "true");
         
GLOBAL_PROPERTIES.setProperty(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, 
String.valueOf(true));
         GLOBAL_PROPERTIES.setProperty(QueryServices.LOG_LEVEL, 
LogLevel.DEBUG.name()); //Need logging for query metrics
+        GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_ENABLED, 
String.valueOf(true));
+        GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, 
String.valueOf(13));
+        GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_CORE_POOL_SIZE, 
String.valueOf(17));
+        GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_MAX_THREADS, 
String.valueOf(19));
+        GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_MAX_QUEUE, 
String.valueOf(23));
+        
GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, 
String.valueOf(true));
+
     }
 
     @AfterClass
@@ -172,6 +192,40 @@ public class ParallelPhoenixConnectionIT {
         }
     }
 
+    @Test
+    public void testDifferentCQSIThreadPoolsForParallelConnection() throws 
Exception {
+        try (Connection conn = getParallelConnection()) {
+            ParallelPhoenixConnection pr = 
conn.unwrap(ParallelPhoenixConnection.class);
+            PhoenixConnection pConn;
+            PhoenixConnection pConn2;
+            if 
(CLUSTERS.getJdbcUrl1(haGroup).equals(pr.getFutureConnection1().get().getURL()))
 {
+                pConn = pr.getFutureConnection1().get();
+                pConn2 = pr.getFutureConnection2().get();
+            } else {
+                pConn = pr.getFutureConnection2().get();
+                pConn2 = pr.getFutureConnection1().get();
+            }
+
+            // verify connection#1
+            ConnectionQueryServices cqsi = 
PhoenixDriver.INSTANCE.getConnectionQueryServices(CLUSTERS.getJdbcUrl1(haGroup),
 clientProperties);
+            ConnectionQueryServices cqsiFromConn = pConn.getQueryServices();
+            // Check that same ThreadPoolExecutor object is used for CQSIs
+            ThreadPoolExecutor threadPoolExecutor1 = 
extractThreadPoolExecutorFromCQSI(cqsi);
+            Assert.assertSame(threadPoolExecutor1, 
extractThreadPoolExecutorFromCQSI(cqsiFromConn));
+
+            // verify connection#2
+            cqsi = 
PhoenixDriver.INSTANCE.getConnectionQueryServices(CLUSTERS.getJdbcUrl2(haGroup),
 clientProperties);
+            cqsiFromConn = pConn2.getQueryServices();
+            Assert.assertSame(cqsi, cqsiFromConn);
+            // Check that same ThreadPoolExecutor object is used for CQSIs
+            ThreadPoolExecutor threadPoolExecutor2 = 
extractThreadPoolExecutorFromCQSI(cqsi);
+            Assert.assertSame(extractThreadPoolExecutorFromCQSI(cqsi), 
extractThreadPoolExecutorFromCQSI(cqsiFromConn));
+
+            // Check that both threadPools for parallel connections are 
different.
+            assertNotSame(threadPoolExecutor1, threadPoolExecutor2);
+        }
+    }
+
     /**
      * Test Phoenix connection creation and basic operations with HBase 
cluster(s) unavailable.
      */
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/query/ConnectionQueryServicesImplThreadPoolIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/query/ConnectionQueryServicesImplThreadPoolIT.java
new file mode 100644
index 0000000000..f2edfcb9cd
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/query/ConnectionQueryServicesImplThreadPoolIT.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.query;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.ConnectionImplementation;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
+import org.apache.phoenix.jdbc.ConnectionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_CORE_POOL_SIZE;
+import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ConnectionQueryServicesImplThreadPoolIT extends BaseTest {
+
+    private static final Logger LOGGER =
+            
LoggerFactory.getLogger(ConnectionQueryServicesImplThreadPoolIT.class);
+    private AtomicInteger counter = new AtomicInteger();
+    private static HBaseTestingUtility hbaseTestUtil;
+    private String tableName;
+    private static final String CONN_QUERY_SERVICE_CREATE_TABLE = 
"CONN_QUERY_SERVICE_CREATE_TABLE";
+    private static final String CONN_QUERY_SERVICE_1 = "CONN_QUERY_SERVICE_1";
+    private static final String CONN_QUERY_SERVICE_2 = "CONN_QUERY_SERVICE_2";
+    private static final int TEST_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS = 13;
+    private static final int TEST_CQSI_THREAD_POOL_CORE_POOL_SIZE = 17;
+    private static final int TEST_CQSI_THREAD_POOL_MAX_THREADS = 19;
+    private static final int TEST_CQSI_THREAD_POOL_MAX_QUEUE = 23;
+
+
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        InstanceResolver.clearSingletons();
+        InstanceResolver.getSingleton(ConfigurationFactory.class, new 
ConfigurationFactory() {
+            @Override public Configuration getConfiguration() {
+                Configuration conf = HBaseConfiguration.create();
+                conf.set(CQSI_THREAD_POOL_ENABLED, Boolean.toString(true));
+                conf.set(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, 
Integer.toString(TEST_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS));
+                conf.set(CQSI_THREAD_POOL_CORE_POOL_SIZE,  
Integer.toString(TEST_CQSI_THREAD_POOL_CORE_POOL_SIZE));
+                conf.set(CQSI_THREAD_POOL_MAX_THREADS,  
Integer.toString(TEST_CQSI_THREAD_POOL_MAX_THREADS));
+                conf.set(CQSI_THREAD_POOL_MAX_QUEUE,  
Integer.toString(TEST_CQSI_THREAD_POOL_MAX_QUEUE));
+                conf.set(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, 
Boolean.toString(true));
+                return conf;
+            }
+
+            @Override public Configuration getConfiguration(Configuration 
confToClone) {
+                Configuration conf = HBaseConfiguration.create();
+                conf.set(CQSI_THREAD_POOL_ENABLED, Boolean.toString(true));
+                conf.set(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, 
Integer.toString(TEST_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS));
+                conf.set(CQSI_THREAD_POOL_CORE_POOL_SIZE,  
Integer.toString(TEST_CQSI_THREAD_POOL_CORE_POOL_SIZE));
+                conf.set(CQSI_THREAD_POOL_MAX_THREADS,  
Integer.toString(TEST_CQSI_THREAD_POOL_MAX_THREADS));
+                conf.set(CQSI_THREAD_POOL_MAX_QUEUE,  
Integer.toString(TEST_CQSI_THREAD_POOL_MAX_QUEUE));
+                conf.set(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, 
Boolean.toString(true));
+                Configuration copy = new Configuration(conf);
+                copy.addResource(confToClone);
+                return copy;
+            }
+        });
+        Configuration conf = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        hbaseTestUtil = new HBaseTestingUtility(conf);
+        setUpConfigForMiniCluster(conf);
+        hbaseTestUtil.startMiniCluster();
+        String zkQuorum = "localhost:" + 
hbaseTestUtil.getZkCluster().getClientPort();
+        url = PhoenixRuntime.JDBC_PROTOCOL + 
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+    }
+
+    @AfterClass
+    public static void tearDownMiniCluster() {
+        try {
+            if (hbaseTestUtil != null) {
+                hbaseTestUtil.shutdownMiniCluster();
+            }
+        } catch (Exception e) {
+            // ignore
+        } finally {
+            ServerMetadataCacheTestImpl.resetCache();
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        tableName = generateUniqueName();
+        createTable(tableName);
+    }
+
+    private String connUrlWithPrincipal(String principalName) throws 
SQLException {
+        return ConnectionInfo.create(url, null, 
null).withPrincipal(principalName).toUrl();
+    }
+
+    @Test
+    public void checkHTableThreadPoolExecutorSame() throws Exception {
+        Table table = createCQSI(null).getTable(tableName.getBytes());
+        assertTrue(table instanceof HTable);
+        HTable hTable = (HTable) table;
+        Field props = hTable.getClass().getDeclaredField("pool");
+        props.setAccessible(true);
+        validateThreadPoolExecutor((ThreadPoolExecutor) props.get(hTable));
+    }
+
+    @Test
+    public void checkHConnectionThreadPoolExecutorSame() throws Exception {
+        // Extract Conn1 instance from CQSI1
+        ConnectionImplementation conn1 = 
extractConnectionFromCQSI(createCQSI("hello"));
+        // Extract batchPool from connection in CQSI1
+        ThreadPoolExecutor threadPoolExecutor1FromConnection = 
extractBatchPool(conn1);
+        // Create another CQSI2
+        ConnectionQueryServices connQueryServices2 = createCQSI("bye");
+        // Extract the ThreadPoolExecutor from CQSI2 instance
+        ThreadPoolExecutor threadPoolExecutor2 = 
extractThreadPoolExecutorFromCQSI(connQueryServices2);
+        // Extract Conn2 from CQSI2
+        ConnectionImplementation conn2 = 
extractConnectionFromCQSI(createCQSI("bye"));
+        // Extract batchPool from connection2 in CQSI2
+        ThreadPoolExecutor threadPoolExecutor2FromConnection = 
extractBatchPool(conn2);
+        // Check if ThreadPoolExecutor2 from CQSI and from Connection are Same
+        assertSame(threadPoolExecutor2, threadPoolExecutor2FromConnection);
+        // Check if threadPoolExecutor from connection1 and from conn2 are 
different
+        assertNotSame(threadPoolExecutor1FromConnection, 
threadPoolExecutor2FromConnection);
+
+        // Validate the properties for ThreadPoolExecutors
+        validateThreadPoolExecutor(threadPoolExecutor1FromConnection);
+        validateThreadPoolExecutor(threadPoolExecutor2FromConnection);
+        validateThreadPoolExecutor(threadPoolExecutor2);
+    }
+
+    private static ThreadPoolExecutor 
extractBatchPool(ConnectionImplementation conn) throws NoSuchFieldException, 
IllegalAccessException {
+        Field batchPoolField = conn.getClass().getDeclaredField("batchPool");
+        batchPoolField.setAccessible(true);
+        return (ThreadPoolExecutor) batchPoolField.get(conn);
+    }
+
+    @Test
+    public void testMultipleCQSIThreadPoolsInParallel() throws Exception {
+        ConnectionQueryServices cqsiExternal1  = 
createCQSI(CONN_QUERY_SERVICE_1);
+        ConnectionQueryServices cqsiExternal2  = 
createCQSI(CONN_QUERY_SERVICE_2);
+        Thread cqsiThread1 = new Thread(() -> {
+            try {
+                ConnectionQueryServices cqsi  = 
createCQSI(CONN_QUERY_SERVICE_1);
+                checkSameThreadPool(cqsiExternal1, cqsi);
+                checkDifferentThreadPool(cqsiExternal2, cqsi);
+                
validateThreadPoolExecutor(extractThreadPoolExecutorFromCQSI(cqsi));
+                counter.incrementAndGet();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+        Thread cqsiThread2 = new Thread(() -> {
+            try {
+                ConnectionQueryServices cqsi  = 
createCQSI(CONN_QUERY_SERVICE_1);
+                checkSameThreadPool(cqsiExternal1, cqsi);
+                checkDifferentThreadPool(cqsiExternal2, cqsi);
+                
validateThreadPoolExecutor(extractThreadPoolExecutorFromCQSI(cqsi));
+                counter.incrementAndGet();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+        Thread cqsiThread3 = new Thread(() -> {
+            try {
+                ConnectionQueryServices cqsi  = 
createCQSI(CONN_QUERY_SERVICE_2);
+                checkSameThreadPool(cqsiExternal2, cqsi);
+                checkDifferentThreadPool(cqsiExternal1, cqsi);
+                
validateThreadPoolExecutor(extractThreadPoolExecutorFromCQSI(cqsi));
+                counter.incrementAndGet();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+
+        Thread cqsiThread4 = new Thread(() -> {
+            try {
+                ConnectionQueryServices cqsi  = 
createCQSI(CONN_QUERY_SERVICE_2);
+                checkSameThreadPool(cqsiExternal2, cqsi);
+                checkDifferentThreadPool(cqsiExternal1, cqsi);
+                
validateThreadPoolExecutor(extractThreadPoolExecutorFromCQSI(cqsi));
+                counter.incrementAndGet();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+
+
+        cqsiThread1.start();
+        cqsiThread2.start();
+        cqsiThread3.start();
+        cqsiThread4.start();
+        cqsiThread1.join();
+        cqsiThread2.join();
+        cqsiThread3.join();
+        cqsiThread4.join();
+
+        assertEquals(4, counter.get());
+    }
+
+    private void checkSameThreadPool(ConnectionQueryServices cqsi1, 
ConnectionQueryServices cqsi2) throws NoSuchFieldException, 
IllegalAccessException {
+        assertSame(extractThreadPoolExecutorFromCQSI(cqsi1), 
extractThreadPoolExecutorFromCQSI(cqsi2));
+    }
+
+    private void checkDifferentThreadPool(ConnectionQueryServices cqsi1, 
ConnectionQueryServices cqsi2) throws NoSuchFieldException, 
IllegalAccessException {
+        assertNotSame(extractThreadPoolExecutorFromCQSI(cqsi1), 
extractThreadPoolExecutorFromCQSI(cqsi2));
+    }
+
+    private ConnectionQueryServices createCQSI(String serviceName) throws 
SQLException {
+        String principalURL = connUrlWithPrincipal(serviceName);
+        Connection conn = DriverManager.getConnection(principalURL);
+        return conn.unwrap(PhoenixConnection.class).getQueryServices();
+    }
+
+    private void validateThreadPoolExecutor(ThreadPoolExecutor 
threadPoolExecutor) {
+        assertEquals(TEST_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, 
threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS));
+        assertEquals(TEST_CQSI_THREAD_POOL_CORE_POOL_SIZE, 
threadPoolExecutor.getCorePoolSize());
+        assertEquals(TEST_CQSI_THREAD_POOL_MAX_THREADS, 
threadPoolExecutor.getMaximumPoolSize());
+        assertEquals(TEST_CQSI_THREAD_POOL_MAX_QUEUE, 
threadPoolExecutor.getQueue().remainingCapacity());
+    }
+
+
+    private void createTable(String tableName) throws SQLException {
+        String CREATE_TABLE_DDL = "CREATE TABLE IF NOT EXISTS %s (K 
VARCHAR(10) NOT NULL"
+                + " PRIMARY KEY, V VARCHAR)";
+        String princURL = 
connUrlWithPrincipal(CONN_QUERY_SERVICE_CREATE_TABLE);
+        LOGGER.info("Connection Query Service : " + 
CONN_QUERY_SERVICE_CREATE_TABLE + " URL : " + princURL);
+        try (Connection conn = DriverManager.getConnection(princURL);
+             Statement stmt = conn.createStatement()) {
+            stmt.execute(String.format(CREATE_TABLE_DDL, tableName));
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    private ConnectionImplementation 
extractConnectionFromCQSI(ConnectionQueryServices cqsi) throws 
NoSuchFieldException, IllegalAccessException {
+        Field connectionField1 = 
cqsi.getClass().getDeclaredField("connection");
+        connectionField1.setAccessible(true);
+        return (ConnectionImplementation) connectionField1.get(cqsi);
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index a7fcfd2cb3..a888a8ac1a 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -83,6 +83,7 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -110,6 +111,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -2188,4 +2190,10 @@ public abstract class BaseTest {
         }
         return false;
     }
+
+    public static ThreadPoolExecutor extractThreadPoolExecutorFromCQSI(final 
ConnectionQueryServices cqs) throws NoSuchFieldException, 
IllegalAccessException {
+        Field props = cqs.getClass().getDeclaredField("threadPoolExecutor");
+        props.setAccessible(true);
+        return (ThreadPoolExecutor) props.get(cqs);
+    }
 }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
index 048bf250aa..f25e7a04ed 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
@@ -21,13 +21,21 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_MUTEX_TABLE_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_FOR_MUTEX;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_CORE_POOL_SIZE;
+import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
@@ -42,6 +50,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
@@ -58,6 +70,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.phoenix.SystemExitRule;
 import org.apache.phoenix.exception.PhoenixIOException;
+import org.apache.phoenix.jdbc.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.monitoring.GlobalClientMetrics;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -134,6 +147,38 @@ public class ConnectionQueryServicesImplTest {
         doCallRealMethod().when(mockCqs).dropTables(Mockito.any());
     }
 
+    @Test
+    public void testCQSIThreadPoolCreation() throws SQLException, 
NoSuchFieldException, IllegalAccessException {
+        QueryServices mockQueryServices = Mockito.mock(QueryServices.class);
+        ReadOnlyProps readOnlyProps = createCQSIThreadPoolReadOnlyProps();
+        when(mockQueryServices.getProps()).thenReturn(readOnlyProps);
+        ConnectionInfo mockConnectionInfo = Mockito.mock(ConnectionInfo.class);
+        when(mockConnectionInfo.asProps()).thenReturn(readOnlyProps);
+        Properties properties = new Properties();
+        ConnectionQueryServicesImpl cqs = new 
ConnectionQueryServicesImpl(mockQueryServices, mockConnectionInfo, properties);
+        Field props = cqs.getClass().getDeclaredField("threadPoolExecutor");
+        props.setAccessible(true);
+        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) 
props.get(cqs);
+        assertNotNull(threadPoolExecutor);
+        assertEquals(readOnlyProps.getInt(CQSI_THREAD_POOL_CORE_POOL_SIZE, 
-1), threadPoolExecutor.getCorePoolSize());
+        assertEquals(readOnlyProps.getInt(CQSI_THREAD_POOL_MAX_THREADS,-1), 
threadPoolExecutor.getMaximumPoolSize());
+        assertEquals(LinkedBlockingQueue.class, 
threadPoolExecutor.getQueue().getClass());
+        assertEquals(readOnlyProps.getInt(CQSI_THREAD_POOL_MAX_QUEUE, -1), 
threadPoolExecutor.getQueue().remainingCapacity());
+        assertEquals(readOnlyProps.getInt(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, 
-1), threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS));
+        assertTrue(threadPoolExecutor.allowsCoreThreadTimeOut());
+    }
+
+    private static ReadOnlyProps createCQSIThreadPoolReadOnlyProps() {
+        Map<String, String> props = new HashMap<>();
+        props.put(CQSI_THREAD_POOL_ENABLED, Boolean.toString(true));
+        props.put(CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS, Integer.toString(13));
+        props.put(CQSI_THREAD_POOL_CORE_POOL_SIZE,  Integer.toString(17));
+        props.put(CQSI_THREAD_POOL_MAX_THREADS,  Integer.toString(19));
+        props.put(CQSI_THREAD_POOL_MAX_QUEUE,  Integer.toString(23));
+        props.put(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, 
Boolean.toString(true));
+        return new ReadOnlyProps(props);
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void testExceptionHandlingOnSystemNamespaceCreation() throws 
Exception {
@@ -330,26 +375,26 @@ public class ConnectionQueryServicesImplTest {
     public void testGetSysMutexTableWithName() throws Exception {
         when(mockAdmin.tableExists(any())).thenReturn(true);
         when(mockConn.getAdmin()).thenReturn(mockAdmin);
-        when(mockConn.getTable(TableName.valueOf("SYSTEM.MUTEX")))
+        when(mockConn.getTable(eq(TableName.valueOf("SYSTEM.MUTEX")), any()))
                 .thenReturn(mockTable);
         assertSame(mockCqs.getSysMutexTable(), mockTable);
         verify(mockAdmin, Mockito.times(1)).tableExists(any());
         verify(mockConn, Mockito.times(1)).getAdmin();
         verify(mockConn, Mockito.times(1))
-                .getTable(TableName.valueOf("SYSTEM.MUTEX"));
+                .getTable(eq(TableName.valueOf("SYSTEM.MUTEX")), any());
     }
 
     @Test
     public void testGetSysMutexTableWithNamespace() throws Exception {
         when(mockAdmin.tableExists(any())).thenReturn(false);
         when(mockConn.getAdmin()).thenReturn(mockAdmin);
-        when(mockConn.getTable(TableName.valueOf("SYSTEM:MUTEX")))
+        when(mockConn.getTable(eq(TableName.valueOf("SYSTEM:MUTEX")), any()))
                 .thenReturn(mockTable);
         assertSame(mockCqs.getSysMutexTable(), mockTable);
         verify(mockAdmin, Mockito.times(1)).tableExists(any());
         verify(mockConn, Mockito.times(1)).getAdmin();
         verify(mockConn, Mockito.times(1))
-                .getTable(TableName.valueOf("SYSTEM:MUTEX"));
+                .getTable(eq(TableName.valueOf("SYSTEM:MUTEX")), any());
     }
 
     @Test

Reply via email to