This is an automated email from the ASF dual-hosted git repository.

sanjeet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 4526040e29 PHOENIX-7626: Add metrics to capture HTable thread pool 
utilization and contention (#2169)
4526040e29 is described below

commit 4526040e297f321203e026fb89c4e0563f84bd06
Author: sanjeet006py <36011005+sanjeet00...@users.noreply.github.com>
AuthorDate: Tue Jun 24 13:31:58 2025 +0530

    PHOENIX-7626: Add metrics to capture HTable thread pool utilization and 
contention (#2169)
---
 .../job/HTableThreadPoolWithUtilizationStats.java  | 142 ++++++++++
 .../monitoring/HTableThreadPoolHistograms.java     | 176 ++++++++++++
 .../monitoring/HTableThreadPoolMetricsManager.java | 158 +++++++++++
 .../phoenix/monitoring/HistogramDistribution.java  |  10 +
 .../phoenix/monitoring/PercentileHistogram.java    | 162 +++++++++++
 .../PercentileHistogramDistribution.java           | 101 +++++++
 .../phoenix/monitoring/UtilizationHistogram.java   |  62 +++++
 .../phoenix/query/ConnectionQueryServicesImpl.java |  54 +++-
 .../org/apache/phoenix/query/QueryServices.java    |   4 +
 .../apache/phoenix/query/QueryServicesOptions.java |  14 +-
 .../org/apache/phoenix/util/PhoenixRuntime.java    |  50 +++-
 .../phoenix/jdbc/ParallelPhoenixConnectionIT.java  | 118 +++++++-
 .../monitoring/BaseHTableThreadPoolMetricsIT.java  | 143 ++++++++++
 .../monitoring/CQSIThreadPoolMetricsIT.java        | 250 +++++++++++++++++
 .../ExternalHTableThreadPoolMetricsIT.java         | 299 +++++++++++++++++++++
 .../org/apache/phoenix/jdbc/PhoenixTestDriver.java |  13 +
 .../java/org/apache/phoenix/query/BaseTest.java    |  11 +-
 17 files changed, 1747 insertions(+), 20 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/job/HTableThreadPoolWithUtilizationStats.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/job/HTableThreadPoolWithUtilizationStats.java
new file mode 100644
index 0000000000..fab9865a33
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/job/HTableThreadPoolWithUtilizationStats.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.job;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.monitoring.HTableThreadPoolHistograms;
+import org.apache.phoenix.monitoring.HTableThreadPoolMetricsManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+
+
+/**
+ * <b>External User-Facing API</b>
+ * <p>
+ * A specialized ThreadPoolExecutor designed specifically for capturing HTable 
thread pool
+ * utilization statistics. This class extends the standard ThreadPoolExecutor 
with built-in
+ * instrumentation to automatically collect key utilization metrics including 
active thread count
+ * and queue size.
+ * </p>
+ * <h3>Purpose</h3>
+ * <p>
+ * Use this ThreadPoolExecutor when you need to monitor and analyze the 
performance characteristics
+ * of HTable thread pool. The collected metrics help in understanding thread 
pool behavior,
+ * identifying bottlenecks, and optimizing thread pool configurations.
+ * </p>
+ * <h3>Setup and Configuration</h3>
+ * <p>
+ * When instantiating this thread pool executor, you must provide an 
idempotent supplier that
+ * returns an instance of {@link HTableThreadPoolHistograms}. This supplier 
enables the collection
+ * of utilization statistics. Within the supplier, you can also attach custom 
tags to the
+ * {@link HTableThreadPoolHistograms} instance for enhanced monitoring and 
filtering capabilities.
+ * </p>
+ * <p>
+ * <b>Important:</b> If you pass a null supplier, metrics collection will be 
completely disabled.
+ * This can be useful in scenarios where you want to use the thread pool 
without the overhead of
+ * collecting utilization statistics.
+ * </p>
+ * <h3>Consuming Metrics</h3>
+ * <p>
+ * To retrieve the collected metrics as percentile distributions:
+ * </p>
+ * <ol>
+ * <li>Call {@link PhoenixRuntime#getHTableThreadPoolHistograms()}</li>
+ * <li>Use the htableThreadPoolHistogramsName as the key to retrieve the list 
of
+ * {@link org.apache.phoenix.monitoring.PercentileHistogramDistribution} 
instances</li>
+ * <li>Each metric type will have its own distribution instance in the 
returned list</li>
+ * </ol>
+ * <p>
+ * Refer to the {@link 
org.apache.phoenix.monitoring.PercentileHistogramDistribution} documentation
+ * to understand how to extract percentile values from the recorded data.
+ * </p>
+ * <h3>Usage Examples</h3>
+ * <p>
+ * For comprehensive usage examples and best practices, refer to the following 
integration tests:
+ * </p>
+ * <ul>
+ * <li>CQSIThreadPoolMetricsIT</li>
+ * <li>ExternalHTableThreadPoolMetricsIT</li>
+ * </ul>
+ * @see HTableThreadPoolHistograms
+ * @see PhoenixRuntime#getHTableThreadPoolHistograms()
+ * @see org.apache.phoenix.monitoring.PercentileHistogramDistribution
+ */
+public class HTableThreadPoolWithUtilizationStats extends ThreadPoolExecutor {
+
+    private final String htableThreadPoolHistogramsName;
+    private final Supplier<HTableThreadPoolHistograms> 
hTableThreadPoolHistogramsSupplier;
+
+    /**
+     * Creates a new HTable thread pool executor with built-in utilization 
statistics collection.
+     * <p>
+     * This constructor accepts all the standard {@link ThreadPoolExecutor} 
parameters plus
+     * additional parameters specific to HTable thread pool monitoring. The 
thread pool will
+     * automatically collect utilization metrics (active thread count and 
queue size) during task
+     * execution. To retrieve the collected metrics, use
+     * {@link PhoenixRuntime#getHTableThreadPoolHistograms()}.
+     * </p>
+     * @param htableThreadPoolHistogramsName Unique identifier for this thread 
pool's metrics. This
+     *                                       name serves as the key in the 
metrics map returned by
+     *                                       {@link 
PhoenixRuntime#getHTableThreadPoolHistograms()}.
+     *                                       Choose a descriptive name that 
identifies the purpose
+     *                                       of this thread pool.
+     * @param supplier                       Idempotent supplier that provides 
the
+     *                                       {@link 
HTableThreadPoolHistograms} instance for metrics
+     *                                       collection. This supplier will be 
called only the first
+     *                                       time a metric is recorded for the 
given
+     *                                       htableThreadPoolHistogramsName. 
Subsequent metric
+     *                                       recordings will reuse the same 
histogram instance.
+     *                                       <b>Pass null to disable metrics 
collection
+     *                                       entirely</b>, which eliminates 
monitoring overhead but
+     *                                       provides no utilization 
statistics.
+     * @see ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, 
BlockingQueue,
+     *      ThreadFactory)
+     * @see HTableThreadPoolHistograms
+     * @see PhoenixRuntime#getHTableThreadPoolHistograms()
+     */
+    public HTableThreadPoolWithUtilizationStats(int corePoolSize, int 
maximumPoolSize,
+                                                long keepAliveTime, TimeUnit 
unit,
+                                                BlockingQueue<Runnable> 
workQueue,
+                                                ThreadFactory threadFactory,
+                                                String 
htableThreadPoolHistogramsName,
+                                                
Supplier<HTableThreadPoolHistograms> supplier) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
+        this.htableThreadPoolHistogramsName = htableThreadPoolHistogramsName;
+        this.hTableThreadPoolHistogramsSupplier = supplier;
+    }
+
+    public void execute(Runnable runnable) {
+        Preconditions.checkNotNull(runnable);
+        if (hTableThreadPoolHistogramsSupplier != null) {
+            
HTableThreadPoolMetricsManager.updateActiveThreads(htableThreadPoolHistogramsName,
+                    this.getActiveCount(), hTableThreadPoolHistogramsSupplier);
+            // Should we offset queue size by available threads if 
CorePoolSize == MaxPoolSize?
+            // Tasks will first be put into thread pool's queue and will be 
consumed by a worker
+            // thread waiting for tasks to arrive in queue. But while a task 
is in queue, queue
+            // size > 0 though active no. of threads might be less than 
MaxPoolSize.
+            
HTableThreadPoolMetricsManager.updateQueueSize(htableThreadPoolHistogramsName,
+                    this.getQueue().size(), 
hTableThreadPoolHistogramsSupplier);
+        }
+        super.execute(runnable);
+    }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolHistograms.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolHistograms.java
new file mode 100644
index 0000000000..4bdd8fd65a
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolHistograms.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.List;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+
+/**
+ * A collection of histograms that monitor HTable thread pool performance 
metrics including thread
+ * utilization and queue contention. This class tracks two key metrics to help 
identify thread pool
+ * bottlenecks and performance issues. <br/>
+ * <br/>
+ * <b>External User-Facing Class:</b><br/>
+ * This is an external user-facing class that should only be used in 
conjunction with
+ * {@link org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats}. When 
creating an instance of
+ * {@link org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats}, a
+ * {@link java.util.function.Supplier} of this class must be provided. <br/>
+ * <br/>
+ * <b>Monitored Metrics:</b><br/>
+ * <ul>
+ * <li><b>Active Threads Count</b> - Number of threads currently executing 
tasks</li>
+ * <li><b>Queue Size</b> - Number of tasks waiting in the thread pool 
queue</li>
+ * </ul>
+ * <br/>
+ * Each metric is captured using a {@link UtilizationHistogram} that provides 
percentile
+ * distributions, min/max values, and operation counts for comprehensive 
analysis. <br/>
+ * <br/>
+ * <b>Tags:</b><br/>
+ * Supports metadata tags for dimensional monitoring:
+ * <ul>
+ * <li><b>servers</b> - Connection quorum string (ZK quorum, master quorum, 
etc.)</li>
+ * <li><b>cqsiName</b> - Principal identifier for CQSI instance 
differentiation</li>
+ * <li>Custom tags via {@link #addTag(String, String)}</li>
+ * </ul>
+ * The instance created by a supplier can also add tags to provide additional 
context. <br/>
+ * <br/>
+ * <b>Instance Management:</b><br/>
+ * For instances created internally by the CQSI class: One instance per unique 
connection info is
+ * created at the CQSI level, and multiple CQSI instances having the same 
connection info will share
+ * the same instance of this class. <br/>
+ * For instances created externally by users: This instance management detail 
does not apply, and
+ * users have full control over instance creation. <br/>
+ * <br/>
+ * Use {@link #getThreadPoolHistogramsDistribution()} to retrieve immutable 
snapshots of the
+ * collected metrics for monitoring and analysis.
+ */
+public class HTableThreadPoolHistograms {
+    /**
+     * Predefined tag keys for dimensional monitoring and contextual 
categorization of histogram
+     * instances. These tags provide context about the connection and CQSI 
instance associated with
+     * the metrics.
+     */
+    public enum Tag {
+        servers,
+        cqsiName,
+    }
+
+    /**
+     * Enum that captures the name of each of the monitored metrics. These 
names correspond to the
+     * specific thread pool performance metrics being tracked.
+     */
+    public enum HistogramName {
+        ActiveThreadsCount,
+        QueueSize,
+    }
+
+    private final UtilizationHistogram activeThreadsHisto;
+    private final UtilizationHistogram queuedSizeHisto;
+
+    /**
+     * Creates a new instance of HTableThreadPoolHistograms with the specified 
maximum values for
+     * thread pool and queue size.
+     * @param maxThreadPoolSize the maximum number of threads in the thread 
pool, used to configure
+     *                          the active threads histogram
+     * @param maxQueueSize      the maximum size of the thread pool queue, 
used to configure the
+     *                          queue size histogram
+     */
+    public HTableThreadPoolHistograms(long maxThreadPoolSize, long 
maxQueueSize) {
+        activeThreadsHisto = new UtilizationHistogram(maxThreadPoolSize,
+                HistogramName.ActiveThreadsCount.name());
+        queuedSizeHisto = new UtilizationHistogram(maxQueueSize, 
HistogramName.QueueSize.name());
+    }
+
+    /**
+     * Updates the histogram that tracks active threads count with the current 
number of active
+     * threads. <br/>
+     * <br/>
+     * This method is to be called from HTableThreadPoolUtilizationStats class 
only and should not
+     * be used from outside Phoenix.
+     * @param activeThreads the current number of threads actively executing 
tasks
+     */
+    public void updateActiveThreads(long activeThreads) {
+        activeThreadsHisto.addValue(activeThreads);
+    }
+
+    /**
+     * Updates the histogram that tracks queue size with the current number of 
queued tasks. <br/>
+     * <br/>
+     * This method is to be called from HTableThreadPoolUtilizationStats class 
only and should not
+     * be used from outside Phoenix.
+     * @param queuedSize the current number of tasks waiting in the thread 
pool queue
+     */
+    public void updateQueuedSize(long queuedSize) {
+        queuedSizeHisto.addValue(queuedSize);
+    }
+
+    /**
+     * Adds a server tag for dimensional monitoring that identifies the 
connection quorum string
+     * such as ZK quorum, master quorum, etc. This corresponds to the 
"servers" tag key. <br/>
+     * <br/>
+     * This is an external user-facing method which can be called when 
creating an instance of the
+     * class.
+     * @param value the connection quorum string value
+     */
+    public void addServerTag(String value) {
+        addTag(Tag.servers.name(), value);
+    }
+
+    /**
+     * Adds a CQSI name tag that captures the principal of the CQSI instance. 
This corresponds to
+     * the "cqsiName" tag key. <br/>
+     * <br/>
+     * This is an external user-facing method which can be called while 
creating instance of the
+     * class.
+     * @param value the principal identifier for the CQSI instance
+     */
+    public void addCqsiNameTag(String value) {
+        addTag(Tag.cqsiName.name(), value);
+    }
+
+    /**
+     * Adds a custom tag with the specified key-value pair for dimensional 
monitoring. This method
+     * allows adding arbitrary tags beyond the predefined servers and CQSI 
name tags. <br/>
+     * <br/>
+     * This is an external user-facing method which can be called while 
creating instance of the
+     * class.
+     * @param key   the tag key
+     * @param value the tag value
+     */
+    public void addTag(String key, String value) {
+        activeThreadsHisto.addTag(key, value);
+        queuedSizeHisto.addTag(key, value);
+    }
+
+    /**
+     * Returns a list of HistogramDistribution which are immutable snapshots 
containing percentile
+     * distribution, min/max values, and count of values for the monitored 
metrics (active threads
+     * count and queue size). <br/>
+     * <br/>
+     * This method is to be called from
+     * {@link HTableThreadPoolMetricsManager#getHistogramsForAllThreadPools()} 
only and should not
+     * be used from outside Phoenix.
+     * @return list of HistogramDistribution instances representing 
comprehensive snapshots of the
+     *         metrics
+     */
+    public List<HistogramDistribution> getThreadPoolHistogramsDistribution() {
+        return 
ImmutableList.of(activeThreadsHisto.getPercentileHistogramDistribution(),
+                queuedSizeHisto.getPercentileHistogramDistribution());
+    }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolMetricsManager.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolMetricsManager.java
new file mode 100644
index 0000000000..52843ece87
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolMetricsManager.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Central registry and manager for HTable thread pool utilization and 
contention metrics.
+ * <p>
+ * <b>Internal Use Only:</b> This class is designed for internal use only and 
should not be used
+ * directly from outside Phoenix. External consumers should access thread pool 
metrics through
+ * {@link 
org.apache.phoenix.util.PhoenixRuntime#getHTableThreadPoolHistograms()}.
+ * </p>
+ * <p>
+ * This class serves as a singleton registry that maintains {@link 
HTableThreadPoolHistograms}
+ * instances across the entire application lifecycle. Each {@link 
HTableThreadPoolHistograms}
+ * instance contains utilization metrics (active thread count and queue size 
histograms) for a
+ * specific HTable thread pool identified by a unique histogram key.
+ * </p>
+ * <h3>Storage and Instance Management</h3>
+ * <p>
+ * The manager stores {@link HTableThreadPoolHistograms} instances in a 
thread-safe
+ * {@link ConcurrentHashMap} with the following characteristics:
+ * </p>
+ * <ul>
+ * <li><b>Key-based storage:</b> Each histogram instance is identified by a 
unique string key</li>
+ * <li><b>Lazy initialization:</b> Instances are created on-demand when first 
accessed</li>
+ * <li><b>Shared instances:</b> Multiple threads/components can share the same 
histogram instance
+ * using the same key</li>
+ * <li><b>Thread-safe operations:</b> All storage operations are atomic and 
thread-safe</li>
+ * </ul>
+ * <h3>Integration with HTable Thread Pool Utilization</h3>
+ * <p>
+ * This manager is exclusively used by
+ * {@link org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats}, a 
specialized
+ * ThreadPoolExecutor that automatically captures HTable thread pool 
utilization statistics. The
+ * integration workflow is:
+ * </p>
+ * <ol>
+ * <li>The thread pool calls {@link #updateActiveThreads} and {@link 
#updateQueueSize} to record
+ * metrics</li>
+ * <li>Metrics are stored in {@link HTableThreadPoolHistograms} instances 
managed by this class</li>
+ * <li>External consumers access immutable metric snapshots through
+ * {@link 
org.apache.phoenix.util.PhoenixRuntime#getHTableThreadPoolHistograms()}, which 
internally
+ * calls {@link #getHistogramsForAllThreadPools()} to return {@link 
HistogramDistribution} instances
+ * containing percentile distributions, min/max values, and operation counts 
for both active thread
+ * count and queue size metrics</li>
+ * </ol>
+ * <h3>Usage Patterns</h3>
+ * <p>
+ * <b>CQSI Level:</b> For ConnectionQueryServicesImpl thread pools, the 
histogram key is always the
+ * connection URL, allowing multiple CQSI instances with the same connection 
info to share the same
+ * {@link HTableThreadPoolHistograms} instance.
+ * </p>
+ * <p>
+ * <b>External Thread Pools:</b> For user-defined thread pools, the histogram 
key can be the thread
+ * pool name or any unique identifier chosen by the application.
+ * </p>
+ * @see HTableThreadPoolHistograms
+ * @see org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats
+ * @see org.apache.phoenix.util.PhoenixRuntime#getHTableThreadPoolHistograms()
+ */
+public class HTableThreadPoolMetricsManager {
+
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(HTableThreadPoolMetricsManager.class);
+
+    private static final ConcurrentHashMap<String, HTableThreadPoolHistograms>
+        THREAD_POOL_HISTOGRAMS_MAP = new ConcurrentHashMap<>();
+
+    private HTableThreadPoolMetricsManager() {
+    }
+
+    public static Map<String, List<HistogramDistribution>> 
getHistogramsForAllThreadPools() {
+        Map<String, List<HistogramDistribution>> map = new HashMap<>();
+        for (Map.Entry<String, HTableThreadPoolHistograms> entry
+            : THREAD_POOL_HISTOGRAMS_MAP.entrySet()) {
+            HTableThreadPoolHistograms hTableThreadPoolHistograms = 
entry.getValue();
+            map.put(entry.getKey(),
+                
hTableThreadPoolHistograms.getThreadPoolHistogramsDistribution());
+        }
+        return map;
+    }
+
+    private static HTableThreadPoolHistograms getThreadPoolHistograms(
+        String histogramKey, Supplier<HTableThreadPoolHistograms> supplier) {
+        if (supplier == null) {
+            return null;
+        }
+        return THREAD_POOL_HISTOGRAMS_MAP.computeIfAbsent(histogramKey, k -> 
supplier.get());
+    }
+
+    /**
+     * Records the current number of active threads in the thread pool in the 
histogram.
+     * @param histogramKey  Key to uniquely identify {@link 
HTableThreadPoolHistograms} instance.
+     * @param activeThreads Number of active threads in the thread pool.
+     * @param supplier      An idempotent supplier of {@link 
HTableThreadPoolHistograms}.
+     */
+    public static void updateActiveThreads(String histogramKey, int 
activeThreads,
+        Supplier<HTableThreadPoolHistograms> supplier) {
+        HTableThreadPoolHistograms hTableThreadPoolHistograms =
+            getThreadPoolHistograms(histogramKey, supplier);
+        if (hTableThreadPoolHistograms != null) {
+            hTableThreadPoolHistograms.updateActiveThreads(activeThreads);
+        } else {
+            logWarningForNullSupplier(histogramKey);
+        }
+    }
+
+    /**
+     * Records the current number of tasks in the thread pool's queue in the 
histogram.
+     * @param histogramKey Key to uniquely identify {@link 
HTableThreadPoolHistograms} instance.
+     * @param queueSize    Number of tasks in the HTable thread pool's queue.
+     * @param supplier     An idempotent supplier of {@link 
HTableThreadPoolHistograms}.
+     */
+    public static void updateQueueSize(String histogramKey, int queueSize,
+        Supplier<HTableThreadPoolHistograms> supplier) {
+        HTableThreadPoolHistograms hTableThreadPoolHistograms =
+            getThreadPoolHistograms(histogramKey, supplier);
+        if (hTableThreadPoolHistograms != null) {
+            hTableThreadPoolHistograms.updateQueuedSize(queueSize);
+        } else {
+            logWarningForNullSupplier(histogramKey);
+        }
+    }
+
+    private static void logWarningForNullSupplier(String threadPoolName) {
+        LOGGER.warn("No HTable thread pool histograms created for thread pool 
{}", threadPoolName);
+    }
+
+    @VisibleForTesting
+    public static void clearHTableThreadPoolHistograms() {
+        THREAD_POOL_HISTOGRAMS_MAP.clear();
+    }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HistogramDistribution.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HistogramDistribution.java
index 4e8039c27f..269bd65c9c 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HistogramDistribution.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HistogramDistribution.java
@@ -19,6 +19,8 @@ package org.apache.phoenix.monitoring;
 
 import java.util.Map;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
 public interface HistogramDistribution {
     public long getMin();
 
@@ -30,4 +32,12 @@ public interface HistogramDistribution {
 
     public Map<String, Long> getRangeDistributionMap();
 
+    default ImmutableMap<String, Long> getPercentileDistributionMap() {
+        throw new UnsupportedOperationException("Percentile Histogram 
Distribution is not "
+                + "supported!!");
+    }
+
+    default ImmutableMap<String, String> getTags() {
+        return ImmutableMap.of();
+    }
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogram.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogram.java
new file mode 100644
index 0000000000..adbaabdc55
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogram.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.Recorder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class for creating percentile-based histograms that capture 
and analyze the
+ * distribution of recorded values. This histogram tracks values up to a 
specified maximum and
+ * provides statistical analysis including percentile distributions, min/max 
values, and operation
+ * counts.
+ * <p>
+ * <b>Internal Use Only:</b> This class is for internal use only and should 
not be used directly by
+ * users of Phoenix.
+ * </p>
+ * <br/>
+ * Key features:
+ * <ul>
+ * <li>Records values efficiently using {@link org.HdrHistogram.Histogram} 
internally</li>
+ * <li>Generates percentile distributions through concrete implementations</li>
+ * <li>Supports optional metadata tags for enhanced monitoring context</li>
+ * <li>Generates immutable statistical snapshots with percentile analysis</li>
+ * <li>Automatically handles values exceeding the maximum trackable limit</li>
+ * </ul>
+ * <br/>
+ * Usage workflow:
+ * <ol>
+ * <li>Record values using {@link #addValue(long)}</li>
+ * <li>Optionally attach metadata using {@link #addTag(String, String)}</li>
+ * <li>Generate distribution snapshots via {@link 
#getPercentileHistogramDistribution()}</li>
+ * </ol>
+ * <br/>
+ * Concrete implementations must define {@link 
#generateDistributionMap(Histogram)} to specify which
+ * percentiles and statistics to include in the distribution.
+ */
+abstract class PercentileHistogram {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PercentileHistogram.class);
+
+    // Strings used to create metrics names.
+    static final String NUM_OPS_METRIC_NAME = "_num_ops";
+    static final String MIN_METRIC_NAME = "_min";
+    static final String MAX_METRIC_NAME = "_max";
+    static final String MEDIAN_METRIC_NAME = "_median";
+    static final String TWENTY_FIFTH_PERCENTILE_METRIC_NAME = 
"_25th_percentile";
+    static final String SEVENTY_FIFTH_PERCENTILE_METRIC_NAME = 
"_75th_percentile";
+    static final String NINETIETH_PERCENTILE_METRIC_NAME = "_90th_percentile";
+    static final String NINETY_FIFTH_PERCENTILE_METRIC_NAME = 
"_95th_percentile";
+
+    private Histogram prevHistogram = null;
+    /**
+     * The recorder that records the values in a {@link Histogram}.
+     */
+    private final Recorder recorder;
+    private final String name;
+    private final long maxUtil;
+    private Map<String, String> tags = null;
+
+    PercentileHistogram(long maxUtil, String name) {
+        this.name = name;
+        this.maxUtil = maxUtil;
+        this.recorder = new Recorder(maxUtil, 2);
+    }
+
+    /**
+     * Records a value in the histogram for statistical analysis. The recorded 
value will be
+     * included in subsequent percentile calculations and distribution 
snapshot generated by
+     * {@link #getPercentileHistogramDistribution()}. <br/>
+     * <br/>
+     * Values exceeding the maximum trackable limit (specified during 
histogram construction) are
+     * automatically rejected with a warning logged. This prevents histogram 
corruption while
+     * maintaining recording performance.
+     * @param value the value to record, must be within the histogram's 
trackable range
+     */
+    void addValue(long value) {
+        if (value > maxUtil) {
+            // Ignoring recording value more than maximum trackable value.
+            LOGGER.warn("Histogram recording higher value than maximum. 
Ignoring it.");
+            return;
+        }
+        recorder.recordValue(value);
+    }
+
+    /**
+     * Generates an immutable snapshot containing the percentile distribution 
of values recorded
+     * since the last call to this method. The snapshot includes statistical 
metrics such as min/max
+     * values, operation count, percentile values, and any attached tags. <br/>
+     * <br/>
+     * This method captures an interval histogram (values recorded since the 
previous snapshot) and
+     * delegates to concrete implementations via {@link 
#generateDistributionMap(Histogram)} to
+     * determine which specific percentiles and metrics to include. <br/>
+     * <br/>
+     * The returned {@link PercentileHistogramDistribution} is thread-safe and 
immutable, making it
+     * suitable for concurrent access by monitoring systems.
+     * @return an immutable {@link HistogramDistribution} containing 
percentile analysis and
+     *         statistics
+     */
+    HistogramDistribution getPercentileHistogramDistribution() {
+        Histogram histogram = 
this.recorder.getIntervalHistogram(prevHistogram);
+        HistogramDistribution distribution;
+        if (tags == null) {
+            distribution = new PercentileHistogramDistribution(name, 
histogram.getMinValue(),
+                    histogram.getMaxValue(), histogram.getTotalCount(),
+                    generateDistributionMap(histogram));
+        } else {
+            distribution = new PercentileHistogramDistribution(name, 
histogram.getMinValue(),
+                    histogram.getMaxValue(), histogram.getTotalCount(),
+                    generateDistributionMap(histogram), tags);
+        }
+        this.prevHistogram = histogram;
+        return distribution;
+    }
+
+    /**
+     * Attaches a metadata tag to the histogram as a key-value pair. Tags 
provide additional context
+     * about the histogram data and are included in all distribution snapshots 
generated by
+     * {@link #getPercentileHistogramDistribution()}. <br/>
+     * <br/>
+     * Tags are commonly used for dimensional monitoring, allowing metrics to 
be filtered and
+     * grouped by tag names. Multiple tags can be added to the same histogram, 
and duplicate keys
+     * will overwrite previous values.
+     * @param key   the tag key
+     * @param value the tag value
+     */
+    void addTag(String key, String value) {
+        if (tags == null) {
+            tags = new HashMap<>();
+        }
+        tags.put(key, value);
+    }
+
+    /**
+     * Generates a map of percentile distribution where the key is the 
percentile name (e.g.,
+     * "_90th_percentile", "_95th_percentile", "_median", etc.) and the value 
is the actual
+     * percentile value from the histogram snapshot. This method is for 
internal use only and is
+     * called from {@link #getPercentileHistogramDistribution()}.
+     * @param snapshot the snapshot of the {@link Histogram}
+     * @return a map of percentile distribution with percentile names as keys 
and their
+     *         corresponding values
+     */
+    protected abstract Map<String, Long> generateDistributionMap(Histogram 
snapshot);
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogramDistribution.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogramDistribution.java
new file mode 100644
index 0000000000..5c81d9c5da
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogramDistribution.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+/**
+ * <b>External User-Facing API</b>
+ * <p>
+ * An immutable snapshot of percentile distribution data captured by a {@link 
PercentileHistogram}.
+ * This class provides a consumer-friendly interface to access histogram 
statistics without exposing
+ * the underlying {@link org.HdrHistogram.Histogram} instances. <br/>
+ * <br/>
+ * This class contains:
+ * <ul>
+ * <li>Percentile distribution map with percentile names as keys and their 
values</li>
+ * <li>Basic statistics (minimum, maximum, total count)</li>
+ * <li>Optional tags for additional metadata</li>
+ * </ul>
+ * <br/>
+ * Use {@link PercentileHistogram#getPercentileHistogramDistribution()} to get 
the percentile
+ * distribution captured by the histogram. Use {@link #getTags()} to access 
any tags attached to the
+ * histogram. <br/>
+ * <br/>
+ * All data in this class is immutable after construction, making it safe for 
concurrent access.
+ */
+public class PercentileHistogramDistribution extends HistogramDistributionImpl 
{
+    private final ImmutableMap<String, Long> percentileDistributionMap;
+    private ImmutableMap<String, String> tags = null;
+
+    PercentileHistogramDistribution(String histoName, long min, long max, long 
count,
+                                           Map<String, Long> 
percentileDistributionMap) {
+        super(histoName, min, max, count, Collections.emptyMap());
+        this.percentileDistributionMap = 
ImmutableMap.copyOf(percentileDistributionMap);
+    }
+
+    PercentileHistogramDistribution(String histoName, long min, long max, long 
count,
+                                           Map<String, Long> 
percentileDistributionMap,
+                                           Map<String, String> tags) {
+        this(histoName, min, max, count, percentileDistributionMap);
+        this.tags = ImmutableMap.copyOf(tags);
+    }
+
+    /**
+     * Returns an immutable map containing the percentile distribution and 
statistical data captured
+     * by the histogram. The map contains metric names as keys and their 
corresponding values. <br/>
+     * <br/>
+     * The map includes:
+     * <ul>
+     * <li>Percentile values (e.g., "_90th_percentile", "_95th_percentile", 
"_median")</li>
+     * <li>Statistical metrics (e.g., "_min", "_max", "_num_ops")</li>
+     * </ul>
+     * <br/>
+     * This is the primary method for accessing percentile analysis results. 
The specific
+     * percentiles and statistics included depend on the concrete {@link 
PercentileHistogram}
+     * implementation that generated this distribution. <br/>
+     * <br/>
+     * The returned map is immutable and safe for concurrent access.
+     * @return an immutable map of metric names to their calculated values
+     */
+    public ImmutableMap<String, Long> getPercentileDistributionMap() {
+        return percentileDistributionMap;
+    }
+
+    public Map<String, Long> getRangeDistributionMap() {
+        throw new UnsupportedOperationException("Range Histogram Distribution 
is not supported!!");
+    }
+
+    /**
+     * Returns the metadata tags associated with this histogram distribution. 
Tags provide
+     * additional context about the histogram data and are commonly used for 
dimensional monitoring,
+     * allowing metrics to be filtered and grouped by tag names. <br/>
+     * <br/>
+     * Tags are attached to the histogram using {@link 
PercentileHistogram#addTag(String, String)}
+     * before generating the distribution snapshot. <br/>
+     * <br/>
+     * The returned map is immutable and safe for concurrent access.
+     * @return an immutable map of tag key-value pairs, or an empty map if no 
tags were attached
+     */
+    public ImmutableMap<String, String> getTags() {
+        return tags == null ? ImmutableMap.of() : tags;
+    }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/UtilizationHistogram.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/UtilizationHistogram.java
new file mode 100644
index 0000000000..bc136fec3f
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/UtilizationHistogram.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.HdrHistogram.Histogram;
+
+/**
+ * A concrete implementation of {@link PercentileHistogram} specifically 
designed for tracking
+ * utilization metrics. This histogram captures and provides a comprehensive 
set of statistical
+ * metrics including percentile distributions, operation counts, and min/max 
values. <br/>
+ * <br/>
+ * <p>
+ * <b>Internal Use Only:</b> This class is for internal use only and should 
not be used directly by
+ * users of Phoenix.
+ * </p>
+ * <p>
+ * The histogram generates the following metrics:
+ * </p>
+ * <ul>
+ * <li>Number of operations (total count)</li>
+ * <li>Minimum and maximum recorded values</li>
+ * <li>25th, 50th (median), 75th, 90th, and 95th percentiles</li>
+ * </ul>
+ */
+class UtilizationHistogram extends PercentileHistogram {
+
+    UtilizationHistogram(long maxUtil, String name) {
+        super(maxUtil, name);
+    }
+
+    protected Map<String, Long> generateDistributionMap(Histogram snapshot) {
+        Map<String, Long> metrics = new HashMap<>();
+        metrics.put(NUM_OPS_METRIC_NAME, snapshot.getTotalCount());
+        metrics.put(MIN_METRIC_NAME, snapshot.getMinValue());
+        metrics.put(MAX_METRIC_NAME, snapshot.getMaxValue());
+        metrics.put(TWENTY_FIFTH_PERCENTILE_METRIC_NAME, 
snapshot.getValueAtPercentile(25));
+        metrics.put(MEDIAN_METRIC_NAME, snapshot.getValueAtPercentile(50));
+        metrics.put(SEVENTY_FIFTH_PERCENTILE_METRIC_NAME,
+                snapshot.getValueAtPercentile(75));
+        metrics.put(NINETIETH_PERCENTILE_METRIC_NAME, 
snapshot.getValueAtPercentile(90));
+        metrics.put(NINETY_FIFTH_PERCENTILE_METRIC_NAME, 
snapshot.getValueAtPercentile(95));
+        return metrics;
+    }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 6161732357..b2c7f3b4e3 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -78,6 +78,7 @@ import static 
org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_FAIL
 import static 
org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_SUCCESS;
 import static 
org.apache.phoenix.monitoring.MetricType.TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS;
 import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_VIEW_TTL_ENABLED;
@@ -144,6 +145,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 import java.util.regex.Pattern;
 
 import javax.annotation.concurrent.GuardedBy;
@@ -243,14 +245,18 @@ import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.iterate.TableResultIterator;
 import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus;
+import org.apache.phoenix.jdbc.AbstractRPCConnectionInfo;
 import org.apache.phoenix.jdbc.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.RPCConnectionInfo;
+import org.apache.phoenix.jdbc.ZKConnectionInfo;
+import org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats;
 import org.apache.phoenix.log.ConnectionLimiter;
 import org.apache.phoenix.log.DefaultConnectionLimiter;
 import org.apache.phoenix.log.LoggingConnectionLimiter;
 import org.apache.phoenix.log.QueryLoggerDisruptor;
+import org.apache.phoenix.monitoring.HTableThreadPoolHistograms;
 import org.apache.phoenix.monitoring.TableMetricsManager;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PSchema;
@@ -511,16 +517,18 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             // Based on implementations used in
             // org.apache.hadoop.hbase.client.ConnectionImplementation
             final BlockingQueue<Runnable> workQueue = new 
LinkedBlockingQueue<>(maxQueue);
+            Supplier<HTableThreadPoolHistograms> 
hTableThreadPoolHistogramsSupplier =
+                    getThreadPoolHistogramsSupplier(maxThreads, maxQueue);
             this.threadPoolExecutor =
-                    new ThreadPoolExecutor(corePoolSize, maxThreads, 
keepAlive, TimeUnit.SECONDS,
-                            workQueue, new ThreadFactoryBuilder()
-                                        .setDaemon(true)
-                                        .setNameFormat("CQSI-" + threadPoolName
-                                                + "-" + 
threadPoolNumber.incrementAndGet()
-                                                + "-shared-pool-%d")
-                                        .setUncaughtExceptionHandler(
-                                                
Threads.LOGGING_EXCEPTION_HANDLER)
-                                        .build());
+                    new HTableThreadPoolWithUtilizationStats(corePoolSize, 
maxThreads, keepAlive,
+                            TimeUnit.SECONDS, workQueue, new 
ThreadFactoryBuilder()
+                            .setDaemon(true)
+                            .setNameFormat("CQSI-" + threadPoolName
+                                    + "-" + threadPoolNumber.incrementAndGet()
+                                    + "-shared-pool-%d")
+                            .setUncaughtExceptionHandler(
+                                    Threads.LOGGING_EXCEPTION_HANDLER)
+                            .build(), connectionInfo.toUrl(), 
hTableThreadPoolHistogramsSupplier);
             this.threadPoolExecutor.allowCoreThreadTimeOut(finalConfig
                     .getBoolean(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
                     DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT));
@@ -642,6 +650,34 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 .getMetadataCachingSource();
     }
 
+    private Supplier<HTableThreadPoolHistograms> 
getThreadPoolHistogramsSupplier(
+            int maxThreadPoolSize, int maxQueueSize) {
+        if (this.config.getBoolean(CQSI_THREAD_POOL_METRICS_ENABLED,
+                DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED)) {
+            return new Supplier<HTableThreadPoolHistograms>() {
+                @Override
+                public HTableThreadPoolHistograms get() {
+                    HTableThreadPoolHistograms hTableThreadPoolHistograms =
+                            new HTableThreadPoolHistograms(maxThreadPoolSize, 
maxQueueSize);
+                    if (connectionInfo instanceof ZKConnectionInfo) {
+                        hTableThreadPoolHistograms.addServerTag(
+                                ((ZKConnectionInfo) 
connectionInfo).getZkHosts());
+                    } else if (connectionInfo instanceof 
AbstractRPCConnectionInfo) {
+                        hTableThreadPoolHistograms.addServerTag(
+                                ((AbstractRPCConnectionInfo) 
connectionInfo).getBoostrapServers());
+                    } else {
+                        throw new IllegalStateException("Unexpected connection 
info type!!");
+                    }
+                    String cqsiName = connectionInfo.getPrincipal();
+                    hTableThreadPoolHistograms.addCqsiNameTag(cqsiName != null
+                            ? cqsiName : DEFAULT_QUERY_SERVICES_NAME);
+                    return hTableThreadPoolHistograms;
+                }
+            };
+        }
+        return null;
+    }
+
     private Connection openConnection(Configuration conf) throws SQLException {
         Connection localConnection;
         try {
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index dc0fab5a2c..e16e716958 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -510,6 +510,8 @@ public interface QueryServices extends SQLCloseable {
      */
     String TESTS_MINI_CLUSTER_NUM_REGION_SERVERS = 
"phoenix.tests.minicluster.numregionservers";
 
+    String TESTS_MINI_CLUSTER_NUM_MASTERS = 
"phoenix.tests.minicluster.nummasters";
+
 
     /**
      * Config to inject any processing after the client retrieves dummy result 
from the server.
@@ -553,6 +555,8 @@ public interface QueryServices extends SQLCloseable {
 
     String PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT = 
"phoenix.streams.get.table.regions.timeout";
 
+    String CQSI_THREAD_POOL_METRICS_ENABLED = 
"phoenix.cqsi.thread.pool.metrics.enabled";
+
     /**
      * Get executor service used for parallel scans
      */
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index d7da58f3c2..bb5ef67e49 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -36,6 +36,7 @@ import static 
org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_ME
 import static 
org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED;
 import static 
org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_CLASSNAME;
 import static 
org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_METRICS_ENABLED;
 import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK;
@@ -467,6 +468,7 @@ public class QueryServicesOptions {
     public static final int DEFAULT_CQSI_THREAD_POOL_MAX_THREADS = 25;
     public static final int DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE = 512;
     public static final Boolean 
DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = true;
+    public static final Boolean DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED = 
false;
 
 
     private final Configuration config;
@@ -582,7 +584,8 @@ public class QueryServicesOptions {
             .setIfUnset(CQSI_THREAD_POOL_MAX_THREADS, 
DEFAULT_CQSI_THREAD_POOL_MAX_THREADS)
             .setIfUnset(CQSI_THREAD_POOL_MAX_QUEUE, 
DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE)
             .setIfUnset(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
-                    DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT);
+                    DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT)
+            .setIfUnset(CQSI_THREAD_POOL_METRICS_ENABLED, 
DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED);
 
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user 
set
@@ -823,6 +826,15 @@ public class QueryServicesOptions {
                 .getBoolean(TABLE_LEVEL_METRICS_ENABLED, 
DEFAULT_IS_TABLE_LEVEL_METRICS_ENABLED);
     }
 
+    public boolean isCQSIThreadPoolMetricsEnabled() {
+        return config.getBoolean(CQSI_THREAD_POOL_METRICS_ENABLED,
+                DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED);
+    }
+
+    public void setCQSIThreadPoolMetricsEnabled(boolean enabled) {
+        config.setBoolean(CQSI_THREAD_POOL_METRICS_ENABLED, enabled);
+    }
+
     public void setTableLevelMetricsEnabled() {
         set(TABLE_LEVEL_METRICS_ENABLED, true);
     }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 88ad2e36ef..b5f98ce457 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Pair;
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.OrderByExpression;
@@ -67,12 +66,12 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
 import org.apache.phoenix.monitoring.GlobalClientMetrics;
 import org.apache.phoenix.monitoring.GlobalMetric;
+import org.apache.phoenix.monitoring.HTableThreadPoolMetricsManager;
 import org.apache.phoenix.monitoring.HistogramDistribution;
 import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.monitoring.PhoenixTableMetric;
 import org.apache.phoenix.monitoring.TableMetricsManager;
 import 
org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesMetricsManager;
-import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.AmbiguousColumnException;
@@ -1417,6 +1416,53 @@ public class PhoenixRuntime {
         return TableMetricsManager.getSizeHistogramsForAllTables();
     }
 
+    /**
+     * Retrieves comprehensive HTable thread pool utilization and contention 
metrics collected
+     * across all monitored HTable thread pools.
+     * <p>
+     * This method provides access to detailed performance histograms that 
track two critical thread
+     * pool metrics:
+     * </p>
+     * <ul>
+     * <li><b>Active Threads Count</b> - Distribution of the number of threads 
actively executing
+     * tasks</li>
+     * <li><b>Queue Size</b> - Distribution of the number of tasks waiting in 
thread pool
+     * queues</li>
+     * </ul>
+     * <p>
+     * The metrics are automatically collected by
+     * {@link org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats}, a 
specialized
+     * ThreadPoolExecutor that instruments HTable operations. These statistics 
help identify thread
+     * pool bottlenecks, optimize thread pool configurations, and monitor 
overall HTable performance
+     * characteristics.
+     * </p>
+     * <p>
+     * <b>Metric Sources:</b>
+     * </p>
+     * <ul>
+     * <li><b>CQSI Thread Pools</b> - Internal Phoenix CQSI-level thread pools 
(identified by
+     * connection URL)</li>
+     * <li><b>External Thread Pools</b> - User-defined HTable thread pools 
created with
+     * {@link org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats}</li>
+     * </ul>
+     * <p>
+     * Each histogram includes percentile distributions (P50, P90, P95, P99, 
etc.), min/max values,
+     * and operation counts. The returned map is keyed by histogram identifier 
and contains
+     * dimensional tags for enhanced monitoring capabilities.
+     * </p>
+     * @return a map where keys are histogram identifiers (connection URLs for 
CQSI pools, or custom
+     *         names for external pools) and values are lists of {@link 
HistogramDistribution}
+     *         instances containing comprehensive utilization statistics. 
Returns an empty map if no
+     *         HTable thread pools have been monitored or if metrics 
collection is disabled.
+     * @see org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats
+     * @see HTableThreadPoolMetricsManager
+     * @see org.apache.phoenix.monitoring.HTableThreadPoolHistograms
+     * @see HistogramDistribution
+     */
+    public static Map<String, List<HistogramDistribution>> 
getHTableThreadPoolHistograms() {
+        return HTableThreadPoolMetricsManager.getHistogramsForAllThreadPools();
+    }
+
     public static Map<String, List<HistogramDistribution>> 
getAllConnectionQueryServicesHistograms() {
         return 
ConnectionQueryServicesMetricsManager.getHistogramsForAllConnectionQueryServices();
     }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
index e3a805c77b..32d011cd9c 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java
@@ -42,12 +42,14 @@ import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED;
 import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS;
 import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE;
 import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_METRICS_ENABLED;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.doAnswer;
 
 import java.lang.reflect.Field;
 import java.sql.Connection;
@@ -58,6 +60,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.lang3.RandomStringUtils;
@@ -71,13 +74,15 @@ import 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilit
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.monitoring.GlobalMetric;
+import org.apache.phoenix.monitoring.HTableThreadPoolHistograms;
+import org.apache.phoenix.monitoring.HTableThreadPoolMetricsManager;
+import org.apache.phoenix.monitoring.HistogramDistribution;
 import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.ConnectionQueryServicesImpl;
-import org.apache.phoenix.query.HBaseFactoryProvider;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.JDBCUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -86,8 +91,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -125,8 +129,9 @@ public class ParallelPhoenixConnectionIT {
         GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_CORE_POOL_SIZE, 
String.valueOf(17));
         GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_MAX_THREADS, 
String.valueOf(19));
         GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_MAX_QUEUE, 
String.valueOf(23));
-        
GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, 
String.valueOf(true));
-
+        
GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
+            String.valueOf(true));
+        GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_METRICS_ENABLED, 
String.valueOf(true));
     }
 
     @AfterClass
@@ -226,6 +231,107 @@ public class ParallelPhoenixConnectionIT {
         }
     }
 
+    @Test
+    public void testCqsiThreadPoolMetricsForParallelConnection() throws 
Exception {
+        try (Connection conn = getParallelConnection()) {
+            ParallelPhoenixConnection pr = 
conn.unwrap(ParallelPhoenixConnection.class);
+
+            // Get details of connection#1
+            PhoenixConnection pConn1 = pr.getFutureConnection1().get();
+            Configuration config1 = 
pConn1.getQueryServices().getConfiguration();
+            String zkQuorum1 = config1.get(HConstants.ZOOKEEPER_QUORUM);
+            String principal1 = config1.get(QueryServices.QUERY_SERVICES_NAME);
+
+            // Get details of connection#2
+            PhoenixConnection pConn2 = pr.getFutureConnection2().get();
+            Configuration config2 = 
pConn2.getQueryServices().getConfiguration();
+            String zkQuorum2 = config2.get(HConstants.ZOOKEEPER_QUORUM);
+            String principal2 = config2.get(QueryServices.QUERY_SERVICES_NAME);
+
+            // Slow down connection#1
+            CountDownLatch latch = new CountDownLatch(1);
+            slowDownConnection(pr, pr.getFutureConnection1(), 
"futureConnection1", latch);
+
+            try (Statement stmt = conn.createStatement()) {
+                
HTableThreadPoolMetricsManager.getHistogramsForAllThreadPools();
+                try (ResultSet rs =
+                    stmt.executeQuery(String.format("SELECT COUNT(*) FROM %s", 
tableName))) {
+                    assertTrue(rs.next());
+                    assertEquals(0, rs.getInt(1));
+                    assertFalse(rs.next());
+                }
+
+                Map<String, List<HistogramDistribution>> htableHistograms =
+                    
HTableThreadPoolMetricsManager.getHistogramsForAllThreadPools();
+
+                // Assert connection#1 CQSI thread pool metrics
+                String conn1HistogramKey = getHistogramKey(config1);
+                
assertHTableThreadPoolHistograms(htableHistograms.get(conn1HistogramKey),
+                    conn1HistogramKey, false, zkQuorum1, principal1);
+
+                // Assert connection#2 CQSI thread pool metrics
+                String conn2HistogramKey = getHistogramKey(config2);
+                
assertHTableThreadPoolHistograms(htableHistograms.get(conn2HistogramKey),
+                    conn2HistogramKey, true, zkQuorum2, principal2);
+
+                // Assert that the CQSI thread pool metrics for both 
connections are different
+                Assert.assertNotEquals(conn1HistogramKey, conn2HistogramKey);
+            } finally {
+                latch.countDown();
+            }
+        }
+    }
+
+    private void slowDownConnection(ParallelPhoenixConnection pr,
+        CompletableFuture<PhoenixConnection> pConn, String 
futureConnectionField,
+        CountDownLatch latch) throws Exception {
+        Assert.assertTrue(futureConnectionField.equals("futureConnection1")
+            || futureConnectionField.equals("futureConnection2"));
+
+        PhoenixConnection spy = Mockito.spy(pConn.get());
+        doAnswer((invocation) -> {
+            // Block the statement creation until the latch is counted down
+            latch.await();
+            return invocation.callRealMethod();
+        }).when(spy).createStatement();
+
+        // Replace the existing CompletableFuture with the spied 
CompletableFuture
+        Field futureField = 
ParallelPhoenixConnection.class.getDeclaredField(futureConnectionField);
+        futureField.setAccessible(true);
+        CompletableFuture<PhoenixConnection> spiedFuture = 
CompletableFuture.completedFuture(spy);
+        futureField.set(pr, spiedFuture);
+
+        // Verify that the spied CompletableFuture has been setup correctly
+        if (futureConnectionField.equals("futureConnection1")) {
+            Assert.assertSame(spy, pr.getFutureConnection1().get());
+        } else {
+            Assert.assertSame(spy, pr.getFutureConnection2().get());
+        }
+    }
+
+    private String getHistogramKey(Configuration config) throws SQLException {
+        String url =
+            QueryUtil.getConnectionUrl(clientProperties, config, 
HBaseTestingUtilityPair.PRINCIPAL);
+        return ConnectionInfo.createNoLogin(url, null, null).toUrl();
+    }
+
+    private void assertHTableThreadPoolHistograms(List<HistogramDistribution> 
histograms,
+        String histogramKey, boolean isUsed, String zkQuorum, String 
principal) {
+        Assert.assertNotNull(histograms);
+        assertEquals(2, histograms.size());
+        for (HistogramDistribution histogram : histograms) {
+            if (isUsed) {
+                assertTrue(histogram.getCount() > 0);
+            } else {
+                assertEquals(0, histogram.getCount());
+            }
+            assertEquals(zkQuorum,
+                
histogram.getTags().get(HTableThreadPoolHistograms.Tag.servers.name()));
+            assertEquals(principal,
+                
histogram.getTags().get(HTableThreadPoolHistograms.Tag.cqsiName.name()));
+        }
+    }
+
     /**
      * Test Phoenix connection creation and basic operations with HBase 
cluster(s) unavailable.
      */
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BaseHTableThreadPoolMetricsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BaseHTableThreadPoolMetricsIT.java
new file mode 100644
index 0000000000..0d73dabdda
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BaseHTableThreadPoolMetricsIT.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.Assert;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.phoenix.monitoring.HTableThreadPoolHistograms.HistogramName;
+
+public class BaseHTableThreadPoolMetricsIT extends BaseTest {
+    private static final int ROWS_TO_LOAD_INITIALLY = 10000;
+
+    protected void assertHistogramTags(
+            Map<String, List<HistogramDistribution>> 
htableThreadPoolHistograms,
+            Map<String, String> expectedTagKeyValues, String histogramKey) 
throws SQLException {
+        List<HistogramDistribution> histograms = 
htableThreadPoolHistograms.get(histogramKey);
+        Assert.assertEquals(2, histograms.size());
+        for (HistogramDistribution histogram : histograms) {
+            Map<String, String> tags = histogram.getTags();
+
+            // Assert that there only expected tag names are there
+            Assert.assertEquals(expectedTagKeyValues.keySet(), tags.keySet());
+
+            for (Map.Entry<String, String> tag : tags.entrySet()) {
+                String tagName = tag.getKey();
+                Assert.assertEquals(expectedTagKeyValues.get(tagName), 
tags.get(tagName));
+            }
+        }
+    }
+
+    protected void assertHTableThreadPoolUsed(
+            Map<String, List<HistogramDistribution>> 
htableThreadPoolHistograms, String histogramKey) {
+        boolean foundActiveThreadsCountHistogram = false;
+        boolean foundQueueSizeHistogram = false;
+
+        List<HistogramDistribution> histograms = 
htableThreadPoolHistograms.get(histogramKey);
+        // Assert each HTableThreadPoolHistograms has 2 HdrHistograms
+        Assert.assertEquals(2, histograms.size());
+        for (HistogramDistribution histogram : histograms) {
+            if 
(histogram.getHistoName().equals(HistogramName.ActiveThreadsCount.name())) {
+
+                // Assert that at least row count no. of requests were 
processed by
+                // HTableThreadPool as we are fetching 1 row per scan RPC
+                Assert.assertTrue(ROWS_TO_LOAD_INITIALLY
+                        <= histogram.getPercentileDistributionMap().get(
+                        PercentileHistogram.NUM_OPS_METRIC_NAME).intValue());
+                foundActiveThreadsCountHistogram = true;
+            }
+            else if 
(histogram.getHistoName().equals(HistogramName.QueueSize.name())) {
+                foundQueueSizeHistogram = true;
+            }
+        }
+
+        // Assert that the HTableThreadPool expected to be used was actually 
used
+        Assert.assertTrue(foundActiveThreadsCountHistogram);
+        Assert.assertTrue(foundQueueSizeHistogram);
+    }
+
+    protected void assertHTableThreadPoolNotUsed(
+            Map<String, List<HistogramDistribution>> 
htableThreadPoolHistograms,
+            String histogramKey) {
+        boolean foundActiveThreadsCountHistogram = false;
+        boolean foundQueueSizeHistogram = false;
+        List<HistogramDistribution> histograms = 
htableThreadPoolHistograms.get(histogramKey);
+        Assert.assertEquals(2, histograms.size());
+        for (HistogramDistribution histogram : histograms) {
+            if 
(histogram.getHistoName().equals(HistogramName.ActiveThreadsCount.name())) {
+                foundActiveThreadsCountHistogram = true;
+            }
+            else if 
(histogram.getHistoName().equals(HistogramName.QueueSize.name())) {
+                foundQueueSizeHistogram = true;
+            }
+            
Assert.assertFalse(histogram.getPercentileDistributionMap().isEmpty());
+            for (long value: 
histogram.getPercentileDistributionMap().values()) {
+                Assert.assertEquals(0, value);
+            }
+        }
+        Assert.assertTrue(foundActiveThreadsCountHistogram);
+        Assert.assertTrue(foundQueueSizeHistogram);
+    }
+
+    protected Map<String, List<HistogramDistribution>> 
runQueryAndGetHistograms(Connection conn,
+                                                                              
String tableName)
+            throws SQLException {
+        Map<String, List<HistogramDistribution>> htableThreadPoolHistograms;
+
+        try (Statement stmt = conn.createStatement()) {
+            // Per row submit one task for execution in HTable thread pool
+            stmt.setFetchSize(1);
+            try (ResultSet rs = stmt.executeQuery("SELECT * FROM " + 
tableName)) {
+                int rowsRead = 0;
+                // Reset the histograms
+                PhoenixRuntime.getHTableThreadPoolHistograms();
+                while (rs.next()) {
+                    rowsRead++;
+                }
+                Assert.assertEquals(ROWS_TO_LOAD_INITIALLY, rowsRead);
+                htableThreadPoolHistograms = 
PhoenixRuntime.getHTableThreadPoolHistograms();
+            }
+        }
+        return htableThreadPoolHistograms;
+    }
+
+    protected void createTableAndUpsertData(Connection conn, String tableName) 
throws SQLException {
+        try (Statement stmt = conn.createStatement()) {
+            stmt.execute("CREATE TABLE " + tableName + " (k VARCHAR NOT NULL 
PRIMARY KEY, v "
+                    + "VARCHAR)");
+        }
+        try (PreparedStatement stmt =conn.prepareStatement("UPSERT INTO " + 
tableName
+                + " VALUES (?, ?)")) {
+            for (int i = 1; i <= ROWS_TO_LOAD_INITIALLY; i++) {
+                stmt.setString(1, "k" + i);
+                stmt.setString(2, "v" + i);
+                stmt.executeUpdate();
+            }
+            conn.commit();
+        }
+    }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CQSIThreadPoolMetricsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CQSIThreadPoolMetricsIT.java
new file mode 100644
index 0000000000..7f69578d24
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CQSIThreadPoolMetricsIT.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.AbstractRPCConnectionInfo;
+import org.apache.phoenix.jdbc.ConnectionInfo;
+import org.apache.phoenix.jdbc.ZKConnectionInfo;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.phoenix.jdbc.ConnectionInfo.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
+import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED;
+import static 
org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_METRICS_ENABLED;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_QUERY_SERVICES_NAME;
+
+@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
+public class CQSIThreadPoolMetricsIT extends BaseHTableThreadPoolMetricsIT {
+
+    private final String registryClassName;
+    private final Properties props = new Properties();
+
+    public CQSIThreadPoolMetricsIT(String registryClassName) {
+        this.registryClassName = registryClassName;
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        final Configuration conf = HBaseConfiguration.create();
+        setUpConfigForMiniCluster(conf);
+        conf.setBoolean(CQSI_THREAD_POOL_METRICS_ENABLED, true);
+        conf.setBoolean(CQSI_THREAD_POOL_ENABLED, true);
+
+        InstanceResolver.clearSingletons();
+        // Override to get required config for static fields loaded that 
require HBase config
+        InstanceResolver.getSingleton(ConfigurationFactory.class, new 
ConfigurationFactory() {
+
+            @Override public Configuration getConfiguration() {
+                return conf;
+            }
+
+            @Override public Configuration getConfiguration(Configuration 
confToClone) {
+                Configuration copy = new Configuration(conf);
+                copy.addResource(confToClone);
+                return copy;
+            }
+        });
+
+        Map<String, String> props = new HashMap<>();
+        props.put(QueryServices.TESTS_MINI_CLUSTER_NUM_MASTERS, "2");
+        setUpTestDriver(new ReadOnlyProps(props));
+    }
+
+    @Before
+    public void testCaseSetup() {
+        props.setProperty(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 
registryClassName);
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        driver.cleanUpCQSICache();
+        HTableThreadPoolMetricsManager.clearHTableThreadPoolHistograms();
+        props.clear();
+    }
+
+    @Parameterized.Parameters(name = 
"ExternalHTableThreadPoolMetricsIT_registryClassName={0}")
+    public synchronized static Collection<String> data() {
+        return Arrays.asList(ZKConnectionInfo.ZK_REGISTRY_NAME,
+                "org.apache.hadoop.hbase.client.RpcConnectionRegistry",
+                "org.apache.hadoop.hbase.client.MasterRegistry");
+    }
+
+    @Test
+    public void testHistogramsPerConnInfo() throws Exception {
+        String tableName = generateUniqueName();
+        String histogramKey;
+        String cqsiNameService1 = "service1";
+        String cqsiNameService2 = "service2";
+
+        // Create a connection for "service1" connection profile
+        String url = QueryUtil.getConnectionUrl(props, 
utility.getConfiguration(),
+                cqsiNameService1);
+        Map<String, List<HistogramDistribution>> htableThreadPoolHistograms;
+        try (Connection conn = driver.connect(url, props)) {
+            createTableAndUpsertData(conn, tableName);
+
+            htableThreadPoolHistograms = runQueryAndGetHistograms(conn, 
tableName);
+
+            histogramKey = getHistogramKey(url);
+            assertHTableThreadPoolUsed(htableThreadPoolHistograms, 
histogramKey);
+            Map<String, String> expectedTagKeyValues = 
getExpectedTagKeyValues(url,
+                    cqsiNameService1);
+            assertHistogramTags(htableThreadPoolHistograms, 
expectedTagKeyValues, histogramKey);
+        }
+
+        // Create a connection for "service2" connection profile
+        url = QueryUtil.getConnectionUrl(props, utility.getConfiguration(), 
cqsiNameService2);
+        htableThreadPoolHistograms = 
PhoenixRuntime.getHTableThreadPoolHistograms();
+        // Assert that HTableThreadPoolHistograms for service2 is not there yet
+        
Assert.assertNull(htableThreadPoolHistograms.get(getHistogramKey(url)));
+        try (Connection conn = driver.connect(url, props)) {
+            htableThreadPoolHistograms = runQueryAndGetHistograms(conn, 
tableName);
+
+            assertHTableThreadPoolNotUsed(htableThreadPoolHistograms, 
histogramKey);
+            Map<String, String> expectedTagKeyValues = 
getExpectedTagKeyValues(url,
+                    cqsiNameService1);
+            assertHistogramTags(htableThreadPoolHistograms, 
expectedTagKeyValues, histogramKey);
+
+            histogramKey = getHistogramKey(url);
+            // We have HTableThreadPoolHistograms for service1 and service2 
CQSI instances
+            assertHTableThreadPoolUsed(htableThreadPoolHistograms, 
histogramKey);
+            expectedTagKeyValues = getExpectedTagKeyValues(url,
+                    cqsiNameService2);
+            assertHistogramTags(htableThreadPoolHistograms, 
expectedTagKeyValues, histogramKey);
+        }
+    }
+
+    @Test
+    public void testCQSIThreadPoolHistogramsDisabled() throws Exception {
+        String tableName = generateUniqueName();
+        String cqsiName = "service1";
+        props.setProperty(CQSI_THREAD_POOL_METRICS_ENABLED, "false");
+        props.setProperty(CQSI_THREAD_POOL_ENABLED, "true");
+        String url = QueryUtil.getConnectionUrl(props, 
utility.getConfiguration(), cqsiName);
+        try (Connection conn = driver.connect(url, props)) {
+            createTableAndUpsertData(conn, tableName);
+
+            Map<String, List<HistogramDistribution>> 
htableThreadPoolHistograms =
+                    runQueryAndGetHistograms(conn, tableName);
+            String histogramKey = getHistogramKey(url);
+            Assert.assertNull(htableThreadPoolHistograms.get(histogramKey));
+        }
+    }
+
+    @Test
+    public void testDefaultCQSIHistograms() throws Exception {
+        String tableName = generateUniqueName();
+
+        String url = QueryUtil.getConnectionUrl(props, 
utility.getConfiguration());
+        Map<String, List<HistogramDistribution>> htableThreadPoolHistograms;
+        try (Connection conn = driver.connect(url, props)) {
+            createTableAndUpsertData(conn, tableName);
+
+            htableThreadPoolHistograms = runQueryAndGetHistograms(conn, 
tableName);
+
+            String histogramKey = getHistogramKey(url);
+            assertHTableThreadPoolUsed(htableThreadPoolHistograms, 
histogramKey);
+            Map<String, String> expectedTagKeyValues = 
getExpectedTagKeyValues(url,
+                    DEFAULT_QUERY_SERVICES_NAME);
+            assertHistogramTags(htableThreadPoolHistograms, 
expectedTagKeyValues, histogramKey);
+        }
+    }
+
+    @Test
+    public void testThreadPoolHistogramsSharedAcrossCQSIWithSameConnInfo() 
throws Exception {
+        String tableName = generateUniqueName();
+        String histogramKey;
+        String cqsiName = "service1";
+
+        // Create a connection for "service1" connection profile
+        String url = QueryUtil.getConnectionUrl(props, 
utility.getConfiguration(), cqsiName);
+        Map<String, List<HistogramDistribution>> htableThreadPoolHistograms;
+        try (Connection conn = driver.connect(url, props)) {
+            createTableAndUpsertData(conn, tableName);
+
+            htableThreadPoolHistograms = runQueryAndGetHistograms(conn, 
tableName);
+
+            histogramKey = getHistogramKey(url);
+            assertHTableThreadPoolUsed(htableThreadPoolHistograms, 
histogramKey);
+            Map<String, String> expectedTagKeyValues = 
getExpectedTagKeyValues(url, cqsiName);
+            assertHistogramTags(htableThreadPoolHistograms, 
expectedTagKeyValues, histogramKey);
+        }
+
+        driver.cleanUpCQSICache();
+        try (Connection conn = driver.connect(url, props)) {
+            htableThreadPoolHistograms = runQueryAndGetHistograms(conn, 
tableName);
+            // Assert that no new HTableThreadPoolHistograms instance was 
created for a new CQSI
+            // instance
+            String histogramKeyForDefaultCQSI = 
getHistogramKey(QueryUtil.getConnectionUrl(
+                    new Properties(), utility.getConfiguration()));
+            Set<String> histogramKeySet =
+                    new HashSet<>(Arrays.asList(histogramKeyForDefaultCQSI, 
histogramKey));
+            
Assert.assertTrue(histogramKeySet.containsAll(htableThreadPoolHistograms.keySet()));
+            assertHTableThreadPoolUsed(htableThreadPoolHistograms, 
histogramKey);
+            Map<String, String> expectedTagKeyValues = 
getExpectedTagKeyValues(url, cqsiName);
+            assertHistogramTags(htableThreadPoolHistograms, 
expectedTagKeyValues, histogramKey);
+        }
+    }
+
+    private String getHistogramKey(String url) throws SQLException {
+        return ConnectionInfo.createNoLogin(url, null, null).toUrl();
+    }
+
+    private Map<String, String> getExpectedTagKeyValues(String url, String 
cqsiName)
+            throws SQLException {
+        Map<String, String> expectedTagKeyValues = new HashMap<>();
+        ConnectionInfo connInfo = ConnectionInfo.createNoLogin(url, null, 
null);
+        if (registryClassName.equals(ZKConnectionInfo.ZK_REGISTRY_NAME)) {
+            
expectedTagKeyValues.put(HTableThreadPoolHistograms.Tag.servers.name(),
+                    ((ZKConnectionInfo) connInfo).getZkHosts());
+        }
+        else {
+            
expectedTagKeyValues.put(HTableThreadPoolHistograms.Tag.servers.name(),
+                    ((AbstractRPCConnectionInfo) 
connInfo).getBoostrapServers());
+        }
+        
expectedTagKeyValues.put(HTableThreadPoolHistograms.Tag.cqsiName.name(), 
cqsiName);
+        return expectedTagKeyValues;
+    }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/ExternalHTableThreadPoolMetricsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/ExternalHTableThreadPoolMetricsIT.java
new file mode 100644
index 0000000000..8a043b5572
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/ExternalHTableThreadPoolMetricsIT.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.ZKConnectionInfo;
+import org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.query.HTableFactory;
+import org.apache.phoenix.query.QueryServices;
+import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static 
org.apache.phoenix.jdbc.ConnectionInfo.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
+
+@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
+public class ExternalHTableThreadPoolMetricsIT extends 
BaseHTableThreadPoolMetricsIT{
+    private static final int MAX_THREADS_IN_EXTERNAL_THREAD_POOL = 10;
+    private static final int QUEUE_CAPACITY_OF_EXTERNAL_THREAD_POOL = 100;
+    private static final String TAG_NAME = "cluster";
+    private static final Map<String, String> tagValues = new HashMap<>();
+    private static final String THREAD_POOL_1A = 
"external_thread_pool_1".toUpperCase();
+    private static final String THREAD_POOL_2A = 
"external_thread_pool_2".toUpperCase();
+    private static final String HISTOGRAM_DISABLED_THREAD_POOL =
+            "histogram_disabled_thread_pool".toUpperCase();
+    private static final String NULL_SUPPLIER_THREAD_POOL = 
"null_supplier_thread_pool".toUpperCase();
+    private static final String NO_TAGS_THREAD_POOL = 
"no_tags_thread_pool".toUpperCase();
+
+    private final String registryClassName;
+    private final Properties props = new Properties();
+
+    public ExternalHTableThreadPoolMetricsIT(String registryClassName) {
+        this.registryClassName = registryClassName;
+    }
+
+    private static ThreadPoolExecutor createThreadPoolExecutor(String 
threadPoolName) {
+        Supplier<HTableThreadPoolHistograms> supplier =
+                getHTableThreadPoolHistogramsSupplier(threadPoolName);
+        BlockingQueue<Runnable> workQueue =
+                new 
LinkedBlockingQueue<>(QUEUE_CAPACITY_OF_EXTERNAL_THREAD_POOL);
+        ThreadFactory threadFactory = new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat(threadPoolName + "-shared-pool-%d")
+                .setUncaughtExceptionHandler(
+                        Threads.LOGGING_EXCEPTION_HANDLER)
+                .build();
+        return new HTableThreadPoolWithUtilizationStats(
+                MAX_THREADS_IN_EXTERNAL_THREAD_POOL, 
MAX_THREADS_IN_EXTERNAL_THREAD_POOL,
+                30, TimeUnit.SECONDS, workQueue, threadFactory,
+                threadPoolName, supplier) {
+                    @Override
+                    public void execute(Runnable command) {
+                        super.execute(command);
+                    }
+                };
+    }
+
+    private static Supplier<HTableThreadPoolHistograms> 
getHTableThreadPoolHistogramsSupplier(
+            String threadPoolName) {
+        Supplier<HTableThreadPoolHistograms> supplier;
+        if (threadPoolName.equals(HISTOGRAM_DISABLED_THREAD_POOL)) {
+            supplier = null;
+        }
+        else if (threadPoolName.equals(NULL_SUPPLIER_THREAD_POOL)) {
+            supplier = new Supplier<HTableThreadPoolHistograms>() {
+                @Override
+                public HTableThreadPoolHistograms get() {
+                    return null;
+                }
+            };
+        }
+        else if (threadPoolName.equals(NO_TAGS_THREAD_POOL)) {
+            supplier = new Supplier<HTableThreadPoolHistograms>() {
+                @Override
+                public HTableThreadPoolHistograms get() {
+                    return new 
HTableThreadPoolHistograms(MAX_THREADS_IN_EXTERNAL_THREAD_POOL,
+                            QUEUE_CAPACITY_OF_EXTERNAL_THREAD_POOL);
+                }
+            };
+        }
+        else {
+            supplier = new Supplier<HTableThreadPoolHistograms>() {
+                @Override
+                public HTableThreadPoolHistograms get() {
+                    HTableThreadPoolHistograms hTableThreadPoolHistograms =
+                            new 
HTableThreadPoolHistograms(MAX_THREADS_IN_EXTERNAL_THREAD_POOL,
+                                    QUEUE_CAPACITY_OF_EXTERNAL_THREAD_POOL);
+                    hTableThreadPoolHistograms.addTag(TAG_NAME, 
tagValues.get(threadPoolName));
+                    return hTableThreadPoolHistograms;
+                }
+            };
+        }
+        return supplier;
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        tagValues.put(THREAD_POOL_1A, "hbase1a");
+        tagValues.put(THREAD_POOL_2A, "hbase2a");
+
+        final Configuration conf = HBaseConfiguration.create();
+        setUpConfigForMiniCluster(conf);
+
+        InstanceResolver.clearSingletons();
+        // Override to get required config for static fields loaded that 
require HBase config
+        InstanceResolver.getSingleton(ConfigurationFactory.class, new 
ConfigurationFactory() {
+
+            @Override public Configuration getConfiguration() {
+                return conf;
+            }
+
+            @Override public Configuration getConfiguration(Configuration 
confToClone) {
+                Configuration copy = new Configuration(conf);
+                copy.addResource(confToClone);
+                return copy;
+            }
+        });
+        ThreadPoolExecutor executorFor1a = 
createThreadPoolExecutor(THREAD_POOL_1A);
+        ThreadPoolExecutor executorFor2a = 
createThreadPoolExecutor(THREAD_POOL_2A);
+        ThreadPoolExecutor histogramDisabledExecutor =
+                createThreadPoolExecutor(HISTOGRAM_DISABLED_THREAD_POOL);
+        ThreadPoolExecutor nullSupplierExecutor =
+                createThreadPoolExecutor(NULL_SUPPLIER_THREAD_POOL);
+        ThreadPoolExecutor defaultExecutor = 
createThreadPoolExecutor(NO_TAGS_THREAD_POOL);
+        InstanceResolver.getSingleton(HTableFactory.class, new 
HTableFactory.HTableFactoryImpl() {
+            @Override
+            public Table getTable(byte[] tableName,
+                                  org.apache.hadoop.hbase.client.Connection 
connection,
+                                  ExecutorService pool) throws IOException {
+                if 
(Bytes.toString(tableName).startsWith(HISTOGRAM_DISABLED_THREAD_POOL)) {
+                    return super.getTable(tableName, connection, 
histogramDisabledExecutor);
+                }
+                else if 
(Bytes.toString(tableName).startsWith(NULL_SUPPLIER_THREAD_POOL)) {
+                    return super.getTable(tableName, connection, 
nullSupplierExecutor);
+                }
+                else if (Bytes.toString(tableName).startsWith(THREAD_POOL_1A)) 
{
+                    return super.getTable(tableName, connection, 
executorFor1a);
+                }
+                else if (Bytes.toString(tableName).startsWith(THREAD_POOL_2A)) 
{
+                    return super.getTable(tableName, connection, 
executorFor2a);
+                }
+                else {
+                    return super.getTable(tableName, connection, 
defaultExecutor);
+                }
+            }
+        });
+
+        Map<String, String> props = new HashMap<>();
+        props.put(QueryServices.TESTS_MINI_CLUSTER_NUM_MASTERS, "2");
+        setUpTestDriver(new ReadOnlyProps(props));
+    }
+
+    @Before
+    public void testCaseSetup() {
+        props.setProperty(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 
registryClassName);
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        HTableThreadPoolMetricsManager.clearHTableThreadPoolHistograms();
+        props.clear();
+    }
+
+    @Parameterized.Parameters(name = 
"ExternalHTableThreadPoolMetricsIT_registryClassName={0}")
+    public synchronized static Collection<String> data() {
+        return Arrays.asList(ZKConnectionInfo.ZK_REGISTRY_NAME,
+                "org.apache.hadoop.hbase.client.RpcConnectionRegistry",
+                "org.apache.hadoop.hbase.client.MasterRegistry");
+    }
+
+    @Test
+    public void testHistogramsPerHTableThreadPool() throws Exception {
+        String tableName = THREAD_POOL_1A + "." + generateUniqueName();
+
+        Map<String, List<HistogramDistribution>> htableThreadPoolHistograms;
+        String url = QueryUtil.getConnectionUrl(props, 
utility.getConfiguration());
+
+        // Send traffic to HTable thread pool for hbase1a
+        try (Connection conn = driver.connect(url, props)) {
+            createTableAndUpsertData(conn, tableName);
+
+            htableThreadPoolHistograms = runQueryAndGetHistograms(conn, 
tableName);
+            assertHTableThreadPoolUsed(htableThreadPoolHistograms, 
THREAD_POOL_1A);
+            Assert.assertNull(htableThreadPoolHistograms.get(THREAD_POOL_2A));
+
+            Map<String, String> expectedTagKeyValues = new HashMap<>();
+            expectedTagKeyValues.put(TAG_NAME, tagValues.get(THREAD_POOL_1A));
+            assertHistogramTags(htableThreadPoolHistograms, 
expectedTagKeyValues, THREAD_POOL_1A);
+        }
+
+        // Send traffic to HTable thread pool for hbase2a
+        tableName = THREAD_POOL_2A + "." + generateUniqueName();
+        try (Connection conn = driver.connect(url, props)) {
+            createTableAndUpsertData(conn, tableName);
+
+            htableThreadPoolHistograms = runQueryAndGetHistograms(conn, 
tableName);
+
+            // We will have a HTable thread pool for hbase1a also
+            assertHTableThreadPoolUsed(htableThreadPoolHistograms, 
THREAD_POOL_2A);
+            assertHTableThreadPoolNotUsed(htableThreadPoolHistograms, 
THREAD_POOL_1A);
+
+            Map<String, String> expectedTagKeyValues = new HashMap<>();
+            expectedTagKeyValues.put(TAG_NAME, tagValues.get(THREAD_POOL_1A));
+            assertHistogramTags(htableThreadPoolHistograms, 
expectedTagKeyValues, THREAD_POOL_1A);
+
+            expectedTagKeyValues.put(TAG_NAME, tagValues.get(THREAD_POOL_2A));
+            assertHistogramTags(htableThreadPoolHistograms, 
expectedTagKeyValues, THREAD_POOL_2A);
+        }
+    }
+
+    @Test
+    public void testHistogramDisabled() throws Exception {
+        String tableName = HISTOGRAM_DISABLED_THREAD_POOL + "." + 
generateUniqueName();
+
+        Map<String, List<HistogramDistribution>> htableThreadPoolHistograms;
+        String url = QueryUtil.getConnectionUrl(props, 
utility.getConfiguration());
+
+        try (Connection conn = driver.connect(url, props)) {
+            createTableAndUpsertData(conn, tableName);
+            htableThreadPoolHistograms = runQueryAndGetHistograms(conn, 
tableName);
+            
Assert.assertNull(htableThreadPoolHistograms.get(HISTOGRAM_DISABLED_THREAD_POOL));
+        }
+    }
+
+    @Test
+    public void testNullHistogramSupplier() throws Exception {
+        String tableName = NULL_SUPPLIER_THREAD_POOL + "." + 
generateUniqueName();
+
+        Map<String, List<HistogramDistribution>> htableThreadPoolHistograms;
+        String url = QueryUtil.getConnectionUrl(props, 
utility.getConfiguration());
+
+        try (Connection conn = driver.connect(url, props)) {
+            createTableAndUpsertData(conn, tableName);
+            htableThreadPoolHistograms = runQueryAndGetHistograms(conn, 
tableName);
+            
Assert.assertNull(htableThreadPoolHistograms.get(NULL_SUPPLIER_THREAD_POOL));
+        }
+    }
+
+    @Test
+    public void testHistogramsWithoutTags() throws Exception {
+        String tableName = generateUniqueName();
+        Map<String, List<HistogramDistribution>> htableThreadPoolHistograms;
+
+        String url = QueryUtil.getConnectionUrl(props, 
utility.getConfiguration());
+        try (Connection conn = driver.connect(url, props)) {
+            createTableAndUpsertData(conn, tableName);
+            htableThreadPoolHistograms = runQueryAndGetHistograms(conn, 
tableName);
+            assertHTableThreadPoolUsed(htableThreadPoolHistograms, 
NO_TAGS_THREAD_POOL);
+            assertHistogramTags(htableThreadPoolHistograms, new HashMap<>(), 
NO_TAGS_THREAD_POOL);
+        }
+    }
+}
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java 
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
index b5f6d67a80..26c160c16b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
@@ -126,6 +126,19 @@ public class PhoenixTestDriver extends 
PhoenixEmbeddedDriver {
         }
     }
 
+    public void cleanUpCQSICache() throws SQLException {
+        lockInterruptibly(LockMode.WRITE);
+        try {
+            for (ConnectionQueryServices service : 
connectionQueryServicesMap.values()) {
+                service.close();
+            }
+            connectionQueryServicesMap.clear();
+        }
+        finally {
+            unlock(LockMode.WRITE);
+        }
+    }
+
     @GuardedBy("closeLock")
     private void checkClosed() {
         if (closed) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index a888a8ac1a..3e9775e216 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -126,6 +126,7 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.RegionMetrics;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -559,8 +560,14 @@ public abstract class BaseTest {
         utility = new HBaseTestingUtility(conf);
         try {
             long startTime = System.currentTimeMillis();
-            utility.startMiniCluster(overrideProps.getInt(
-                    QueryServices.TESTS_MINI_CLUSTER_NUM_REGION_SERVERS, 
NUM_SLAVES_BASE));
+            StartMiniClusterOption.Builder builder = 
StartMiniClusterOption.builder();
+            
builder.numMasters(overrideProps.getInt(QueryServices.TESTS_MINI_CLUSTER_NUM_MASTERS,
+                    1));
+            int numSlaves = overrideProps.getInt(
+                    QueryServices.TESTS_MINI_CLUSTER_NUM_REGION_SERVERS, 
NUM_SLAVES_BASE);
+            builder.numRegionServers(numSlaves);
+            builder.numDataNodes(numSlaves);
+            utility.startMiniCluster(builder.build());
             long startupTime = System.currentTimeMillis()-startTime;
             LOGGER.info("HBase minicluster startup complete in {} ms", 
startupTime);
             return getLocalClusterUrl(utility);

Reply via email to