This is an automated email from the ASF dual-hosted git repository. sanjeet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 4526040e29 PHOENIX-7626: Add metrics to capture HTable thread pool utilization and contention (#2169) 4526040e29 is described below commit 4526040e297f321203e026fb89c4e0563f84bd06 Author: sanjeet006py <36011005+sanjeet00...@users.noreply.github.com> AuthorDate: Tue Jun 24 13:31:58 2025 +0530 PHOENIX-7626: Add metrics to capture HTable thread pool utilization and contention (#2169) --- .../job/HTableThreadPoolWithUtilizationStats.java | 142 ++++++++++ .../monitoring/HTableThreadPoolHistograms.java | 176 ++++++++++++ .../monitoring/HTableThreadPoolMetricsManager.java | 158 +++++++++++ .../phoenix/monitoring/HistogramDistribution.java | 10 + .../phoenix/monitoring/PercentileHistogram.java | 162 +++++++++++ .../PercentileHistogramDistribution.java | 101 +++++++ .../phoenix/monitoring/UtilizationHistogram.java | 62 +++++ .../phoenix/query/ConnectionQueryServicesImpl.java | 54 +++- .../org/apache/phoenix/query/QueryServices.java | 4 + .../apache/phoenix/query/QueryServicesOptions.java | 14 +- .../org/apache/phoenix/util/PhoenixRuntime.java | 50 +++- .../phoenix/jdbc/ParallelPhoenixConnectionIT.java | 118 +++++++- .../monitoring/BaseHTableThreadPoolMetricsIT.java | 143 ++++++++++ .../monitoring/CQSIThreadPoolMetricsIT.java | 250 +++++++++++++++++ .../ExternalHTableThreadPoolMetricsIT.java | 299 +++++++++++++++++++++ .../org/apache/phoenix/jdbc/PhoenixTestDriver.java | 13 + .../java/org/apache/phoenix/query/BaseTest.java | 11 +- 17 files changed, 1747 insertions(+), 20 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/job/HTableThreadPoolWithUtilizationStats.java b/phoenix-core-client/src/main/java/org/apache/phoenix/job/HTableThreadPoolWithUtilizationStats.java new file mode 100644 index 0000000000..fab9865a33 --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/job/HTableThreadPoolWithUtilizationStats.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.job; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.monitoring.HTableThreadPoolHistograms; +import org.apache.phoenix.monitoring.HTableThreadPoolMetricsManager; +import org.apache.phoenix.util.PhoenixRuntime; + + +/** + * <b>External User-Facing API</b> + * <p> + * A specialized ThreadPoolExecutor designed specifically for capturing HTable thread pool + * utilization statistics. This class extends the standard ThreadPoolExecutor with built-in + * instrumentation to automatically collect key utilization metrics including active thread count + * and queue size. + * </p> + * <h3>Purpose</h3> + * <p> + * Use this ThreadPoolExecutor when you need to monitor and analyze the performance characteristics + * of HTable thread pool. The collected metrics help in understanding thread pool behavior, + * identifying bottlenecks, and optimizing thread pool configurations. + * </p> + * <h3>Setup and Configuration</h3> + * <p> + * When instantiating this thread pool executor, you must provide an idempotent supplier that + * returns an instance of {@link HTableThreadPoolHistograms}. This supplier enables the collection + * of utilization statistics. Within the supplier, you can also attach custom tags to the + * {@link HTableThreadPoolHistograms} instance for enhanced monitoring and filtering capabilities. + * </p> + * <p> + * <b>Important:</b> If you pass a null supplier, metrics collection will be completely disabled. + * This can be useful in scenarios where you want to use the thread pool without the overhead of + * collecting utilization statistics. + * </p> + * <h3>Consuming Metrics</h3> + * <p> + * To retrieve the collected metrics as percentile distributions: + * </p> + * <ol> + * <li>Call {@link PhoenixRuntime#getHTableThreadPoolHistograms()}</li> + * <li>Use the htableThreadPoolHistogramsName as the key to retrieve the list of + * {@link org.apache.phoenix.monitoring.PercentileHistogramDistribution} instances</li> + * <li>Each metric type will have its own distribution instance in the returned list</li> + * </ol> + * <p> + * Refer to the {@link org.apache.phoenix.monitoring.PercentileHistogramDistribution} documentation + * to understand how to extract percentile values from the recorded data. + * </p> + * <h3>Usage Examples</h3> + * <p> + * For comprehensive usage examples and best practices, refer to the following integration tests: + * </p> + * <ul> + * <li>CQSIThreadPoolMetricsIT</li> + * <li>ExternalHTableThreadPoolMetricsIT</li> + * </ul> + * @see HTableThreadPoolHistograms + * @see PhoenixRuntime#getHTableThreadPoolHistograms() + * @see org.apache.phoenix.monitoring.PercentileHistogramDistribution + */ +public class HTableThreadPoolWithUtilizationStats extends ThreadPoolExecutor { + + private final String htableThreadPoolHistogramsName; + private final Supplier<HTableThreadPoolHistograms> hTableThreadPoolHistogramsSupplier; + + /** + * Creates a new HTable thread pool executor with built-in utilization statistics collection. + * <p> + * This constructor accepts all the standard {@link ThreadPoolExecutor} parameters plus + * additional parameters specific to HTable thread pool monitoring. The thread pool will + * automatically collect utilization metrics (active thread count and queue size) during task + * execution. To retrieve the collected metrics, use + * {@link PhoenixRuntime#getHTableThreadPoolHistograms()}. + * </p> + * @param htableThreadPoolHistogramsName Unique identifier for this thread pool's metrics. This + * name serves as the key in the metrics map returned by + * {@link PhoenixRuntime#getHTableThreadPoolHistograms()}. + * Choose a descriptive name that identifies the purpose + * of this thread pool. + * @param supplier Idempotent supplier that provides the + * {@link HTableThreadPoolHistograms} instance for metrics + * collection. This supplier will be called only the first + * time a metric is recorded for the given + * htableThreadPoolHistogramsName. Subsequent metric + * recordings will reuse the same histogram instance. + * <b>Pass null to disable metrics collection + * entirely</b>, which eliminates monitoring overhead but + * provides no utilization statistics. + * @see ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue, + * ThreadFactory) + * @see HTableThreadPoolHistograms + * @see PhoenixRuntime#getHTableThreadPoolHistograms() + */ + public HTableThreadPoolWithUtilizationStats(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, + BlockingQueue<Runnable> workQueue, + ThreadFactory threadFactory, + String htableThreadPoolHistogramsName, + Supplier<HTableThreadPoolHistograms> supplier) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + this.htableThreadPoolHistogramsName = htableThreadPoolHistogramsName; + this.hTableThreadPoolHistogramsSupplier = supplier; + } + + public void execute(Runnable runnable) { + Preconditions.checkNotNull(runnable); + if (hTableThreadPoolHistogramsSupplier != null) { + HTableThreadPoolMetricsManager.updateActiveThreads(htableThreadPoolHistogramsName, + this.getActiveCount(), hTableThreadPoolHistogramsSupplier); + // Should we offset queue size by available threads if CorePoolSize == MaxPoolSize? + // Tasks will first be put into thread pool's queue and will be consumed by a worker + // thread waiting for tasks to arrive in queue. But while a task is in queue, queue + // size > 0 though active no. of threads might be less than MaxPoolSize. + HTableThreadPoolMetricsManager.updateQueueSize(htableThreadPoolHistogramsName, + this.getQueue().size(), hTableThreadPoolHistogramsSupplier); + } + super.execute(runnable); + } +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolHistograms.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolHistograms.java new file mode 100644 index 0000000000..4bdd8fd65a --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolHistograms.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import java.util.List; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; + +/** + * A collection of histograms that monitor HTable thread pool performance metrics including thread + * utilization and queue contention. This class tracks two key metrics to help identify thread pool + * bottlenecks and performance issues. <br/> + * <br/> + * <b>External User-Facing Class:</b><br/> + * This is an external user-facing class that should only be used in conjunction with + * {@link org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats}. When creating an instance of + * {@link org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats}, a + * {@link java.util.function.Supplier} of this class must be provided. <br/> + * <br/> + * <b>Monitored Metrics:</b><br/> + * <ul> + * <li><b>Active Threads Count</b> - Number of threads currently executing tasks</li> + * <li><b>Queue Size</b> - Number of tasks waiting in the thread pool queue</li> + * </ul> + * <br/> + * Each metric is captured using a {@link UtilizationHistogram} that provides percentile + * distributions, min/max values, and operation counts for comprehensive analysis. <br/> + * <br/> + * <b>Tags:</b><br/> + * Supports metadata tags for dimensional monitoring: + * <ul> + * <li><b>servers</b> - Connection quorum string (ZK quorum, master quorum, etc.)</li> + * <li><b>cqsiName</b> - Principal identifier for CQSI instance differentiation</li> + * <li>Custom tags via {@link #addTag(String, String)}</li> + * </ul> + * The instance created by a supplier can also add tags to provide additional context. <br/> + * <br/> + * <b>Instance Management:</b><br/> + * For instances created internally by the CQSI class: One instance per unique connection info is + * created at the CQSI level, and multiple CQSI instances having the same connection info will share + * the same instance of this class. <br/> + * For instances created externally by users: This instance management detail does not apply, and + * users have full control over instance creation. <br/> + * <br/> + * Use {@link #getThreadPoolHistogramsDistribution()} to retrieve immutable snapshots of the + * collected metrics for monitoring and analysis. + */ +public class HTableThreadPoolHistograms { + /** + * Predefined tag keys for dimensional monitoring and contextual categorization of histogram + * instances. These tags provide context about the connection and CQSI instance associated with + * the metrics. + */ + public enum Tag { + servers, + cqsiName, + } + + /** + * Enum that captures the name of each of the monitored metrics. These names correspond to the + * specific thread pool performance metrics being tracked. + */ + public enum HistogramName { + ActiveThreadsCount, + QueueSize, + } + + private final UtilizationHistogram activeThreadsHisto; + private final UtilizationHistogram queuedSizeHisto; + + /** + * Creates a new instance of HTableThreadPoolHistograms with the specified maximum values for + * thread pool and queue size. + * @param maxThreadPoolSize the maximum number of threads in the thread pool, used to configure + * the active threads histogram + * @param maxQueueSize the maximum size of the thread pool queue, used to configure the + * queue size histogram + */ + public HTableThreadPoolHistograms(long maxThreadPoolSize, long maxQueueSize) { + activeThreadsHisto = new UtilizationHistogram(maxThreadPoolSize, + HistogramName.ActiveThreadsCount.name()); + queuedSizeHisto = new UtilizationHistogram(maxQueueSize, HistogramName.QueueSize.name()); + } + + /** + * Updates the histogram that tracks active threads count with the current number of active + * threads. <br/> + * <br/> + * This method is to be called from HTableThreadPoolUtilizationStats class only and should not + * be used from outside Phoenix. + * @param activeThreads the current number of threads actively executing tasks + */ + public void updateActiveThreads(long activeThreads) { + activeThreadsHisto.addValue(activeThreads); + } + + /** + * Updates the histogram that tracks queue size with the current number of queued tasks. <br/> + * <br/> + * This method is to be called from HTableThreadPoolUtilizationStats class only and should not + * be used from outside Phoenix. + * @param queuedSize the current number of tasks waiting in the thread pool queue + */ + public void updateQueuedSize(long queuedSize) { + queuedSizeHisto.addValue(queuedSize); + } + + /** + * Adds a server tag for dimensional monitoring that identifies the connection quorum string + * such as ZK quorum, master quorum, etc. This corresponds to the "servers" tag key. <br/> + * <br/> + * This is an external user-facing method which can be called when creating an instance of the + * class. + * @param value the connection quorum string value + */ + public void addServerTag(String value) { + addTag(Tag.servers.name(), value); + } + + /** + * Adds a CQSI name tag that captures the principal of the CQSI instance. This corresponds to + * the "cqsiName" tag key. <br/> + * <br/> + * This is an external user-facing method which can be called while creating instance of the + * class. + * @param value the principal identifier for the CQSI instance + */ + public void addCqsiNameTag(String value) { + addTag(Tag.cqsiName.name(), value); + } + + /** + * Adds a custom tag with the specified key-value pair for dimensional monitoring. This method + * allows adding arbitrary tags beyond the predefined servers and CQSI name tags. <br/> + * <br/> + * This is an external user-facing method which can be called while creating instance of the + * class. + * @param key the tag key + * @param value the tag value + */ + public void addTag(String key, String value) { + activeThreadsHisto.addTag(key, value); + queuedSizeHisto.addTag(key, value); + } + + /** + * Returns a list of HistogramDistribution which are immutable snapshots containing percentile + * distribution, min/max values, and count of values for the monitored metrics (active threads + * count and queue size). <br/> + * <br/> + * This method is to be called from + * {@link HTableThreadPoolMetricsManager#getHistogramsForAllThreadPools()} only and should not + * be used from outside Phoenix. + * @return list of HistogramDistribution instances representing comprehensive snapshots of the + * metrics + */ + public List<HistogramDistribution> getThreadPoolHistogramsDistribution() { + return ImmutableList.of(activeThreadsHisto.getPercentileHistogramDistribution(), + queuedSizeHisto.getPercentileHistogramDistribution()); + } +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolMetricsManager.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolMetricsManager.java new file mode 100644 index 0000000000..52843ece87 --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HTableThreadPoolMetricsManager.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Central registry and manager for HTable thread pool utilization and contention metrics. + * <p> + * <b>Internal Use Only:</b> This class is designed for internal use only and should not be used + * directly from outside Phoenix. External consumers should access thread pool metrics through + * {@link org.apache.phoenix.util.PhoenixRuntime#getHTableThreadPoolHistograms()}. + * </p> + * <p> + * This class serves as a singleton registry that maintains {@link HTableThreadPoolHistograms} + * instances across the entire application lifecycle. Each {@link HTableThreadPoolHistograms} + * instance contains utilization metrics (active thread count and queue size histograms) for a + * specific HTable thread pool identified by a unique histogram key. + * </p> + * <h3>Storage and Instance Management</h3> + * <p> + * The manager stores {@link HTableThreadPoolHistograms} instances in a thread-safe + * {@link ConcurrentHashMap} with the following characteristics: + * </p> + * <ul> + * <li><b>Key-based storage:</b> Each histogram instance is identified by a unique string key</li> + * <li><b>Lazy initialization:</b> Instances are created on-demand when first accessed</li> + * <li><b>Shared instances:</b> Multiple threads/components can share the same histogram instance + * using the same key</li> + * <li><b>Thread-safe operations:</b> All storage operations are atomic and thread-safe</li> + * </ul> + * <h3>Integration with HTable Thread Pool Utilization</h3> + * <p> + * This manager is exclusively used by + * {@link org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats}, a specialized + * ThreadPoolExecutor that automatically captures HTable thread pool utilization statistics. The + * integration workflow is: + * </p> + * <ol> + * <li>The thread pool calls {@link #updateActiveThreads} and {@link #updateQueueSize} to record + * metrics</li> + * <li>Metrics are stored in {@link HTableThreadPoolHistograms} instances managed by this class</li> + * <li>External consumers access immutable metric snapshots through + * {@link org.apache.phoenix.util.PhoenixRuntime#getHTableThreadPoolHistograms()}, which internally + * calls {@link #getHistogramsForAllThreadPools()} to return {@link HistogramDistribution} instances + * containing percentile distributions, min/max values, and operation counts for both active thread + * count and queue size metrics</li> + * </ol> + * <h3>Usage Patterns</h3> + * <p> + * <b>CQSI Level:</b> For ConnectionQueryServicesImpl thread pools, the histogram key is always the + * connection URL, allowing multiple CQSI instances with the same connection info to share the same + * {@link HTableThreadPoolHistograms} instance. + * </p> + * <p> + * <b>External Thread Pools:</b> For user-defined thread pools, the histogram key can be the thread + * pool name or any unique identifier chosen by the application. + * </p> + * @see HTableThreadPoolHistograms + * @see org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats + * @see org.apache.phoenix.util.PhoenixRuntime#getHTableThreadPoolHistograms() + */ +public class HTableThreadPoolMetricsManager { + + private static final Logger LOGGER = + LoggerFactory.getLogger(HTableThreadPoolMetricsManager.class); + + private static final ConcurrentHashMap<String, HTableThreadPoolHistograms> + THREAD_POOL_HISTOGRAMS_MAP = new ConcurrentHashMap<>(); + + private HTableThreadPoolMetricsManager() { + } + + public static Map<String, List<HistogramDistribution>> getHistogramsForAllThreadPools() { + Map<String, List<HistogramDistribution>> map = new HashMap<>(); + for (Map.Entry<String, HTableThreadPoolHistograms> entry + : THREAD_POOL_HISTOGRAMS_MAP.entrySet()) { + HTableThreadPoolHistograms hTableThreadPoolHistograms = entry.getValue(); + map.put(entry.getKey(), + hTableThreadPoolHistograms.getThreadPoolHistogramsDistribution()); + } + return map; + } + + private static HTableThreadPoolHistograms getThreadPoolHistograms( + String histogramKey, Supplier<HTableThreadPoolHistograms> supplier) { + if (supplier == null) { + return null; + } + return THREAD_POOL_HISTOGRAMS_MAP.computeIfAbsent(histogramKey, k -> supplier.get()); + } + + /** + * Records the current number of active threads in the thread pool in the histogram. + * @param histogramKey Key to uniquely identify {@link HTableThreadPoolHistograms} instance. + * @param activeThreads Number of active threads in the thread pool. + * @param supplier An idempotent supplier of {@link HTableThreadPoolHistograms}. + */ + public static void updateActiveThreads(String histogramKey, int activeThreads, + Supplier<HTableThreadPoolHistograms> supplier) { + HTableThreadPoolHistograms hTableThreadPoolHistograms = + getThreadPoolHistograms(histogramKey, supplier); + if (hTableThreadPoolHistograms != null) { + hTableThreadPoolHistograms.updateActiveThreads(activeThreads); + } else { + logWarningForNullSupplier(histogramKey); + } + } + + /** + * Records the current number of tasks in the thread pool's queue in the histogram. + * @param histogramKey Key to uniquely identify {@link HTableThreadPoolHistograms} instance. + * @param queueSize Number of tasks in the HTable thread pool's queue. + * @param supplier An idempotent supplier of {@link HTableThreadPoolHistograms}. + */ + public static void updateQueueSize(String histogramKey, int queueSize, + Supplier<HTableThreadPoolHistograms> supplier) { + HTableThreadPoolHistograms hTableThreadPoolHistograms = + getThreadPoolHistograms(histogramKey, supplier); + if (hTableThreadPoolHistograms != null) { + hTableThreadPoolHistograms.updateQueuedSize(queueSize); + } else { + logWarningForNullSupplier(histogramKey); + } + } + + private static void logWarningForNullSupplier(String threadPoolName) { + LOGGER.warn("No HTable thread pool histograms created for thread pool {}", threadPoolName); + } + + @VisibleForTesting + public static void clearHTableThreadPoolHistograms() { + THREAD_POOL_HISTOGRAMS_MAP.clear(); + } +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HistogramDistribution.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HistogramDistribution.java index 4e8039c27f..269bd65c9c 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HistogramDistribution.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/HistogramDistribution.java @@ -19,6 +19,8 @@ package org.apache.phoenix.monitoring; import java.util.Map; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + public interface HistogramDistribution { public long getMin(); @@ -30,4 +32,12 @@ public interface HistogramDistribution { public Map<String, Long> getRangeDistributionMap(); + default ImmutableMap<String, Long> getPercentileDistributionMap() { + throw new UnsupportedOperationException("Percentile Histogram Distribution is not " + + "supported!!"); + } + + default ImmutableMap<String, String> getTags() { + return ImmutableMap.of(); + } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogram.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogram.java new file mode 100644 index 0000000000..adbaabdc55 --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogram.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import java.util.HashMap; +import java.util.Map; + +import org.HdrHistogram.Histogram; +import org.HdrHistogram.Recorder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract base class for creating percentile-based histograms that capture and analyze the + * distribution of recorded values. This histogram tracks values up to a specified maximum and + * provides statistical analysis including percentile distributions, min/max values, and operation + * counts. + * <p> + * <b>Internal Use Only:</b> This class is for internal use only and should not be used directly by + * users of Phoenix. + * </p> + * <br/> + * Key features: + * <ul> + * <li>Records values efficiently using {@link org.HdrHistogram.Histogram} internally</li> + * <li>Generates percentile distributions through concrete implementations</li> + * <li>Supports optional metadata tags for enhanced monitoring context</li> + * <li>Generates immutable statistical snapshots with percentile analysis</li> + * <li>Automatically handles values exceeding the maximum trackable limit</li> + * </ul> + * <br/> + * Usage workflow: + * <ol> + * <li>Record values using {@link #addValue(long)}</li> + * <li>Optionally attach metadata using {@link #addTag(String, String)}</li> + * <li>Generate distribution snapshots via {@link #getPercentileHistogramDistribution()}</li> + * </ol> + * <br/> + * Concrete implementations must define {@link #generateDistributionMap(Histogram)} to specify which + * percentiles and statistics to include in the distribution. + */ +abstract class PercentileHistogram { + private static final Logger LOGGER = LoggerFactory.getLogger(PercentileHistogram.class); + + // Strings used to create metrics names. + static final String NUM_OPS_METRIC_NAME = "_num_ops"; + static final String MIN_METRIC_NAME = "_min"; + static final String MAX_METRIC_NAME = "_max"; + static final String MEDIAN_METRIC_NAME = "_median"; + static final String TWENTY_FIFTH_PERCENTILE_METRIC_NAME = "_25th_percentile"; + static final String SEVENTY_FIFTH_PERCENTILE_METRIC_NAME = "_75th_percentile"; + static final String NINETIETH_PERCENTILE_METRIC_NAME = "_90th_percentile"; + static final String NINETY_FIFTH_PERCENTILE_METRIC_NAME = "_95th_percentile"; + + private Histogram prevHistogram = null; + /** + * The recorder that records the values in a {@link Histogram}. + */ + private final Recorder recorder; + private final String name; + private final long maxUtil; + private Map<String, String> tags = null; + + PercentileHistogram(long maxUtil, String name) { + this.name = name; + this.maxUtil = maxUtil; + this.recorder = new Recorder(maxUtil, 2); + } + + /** + * Records a value in the histogram for statistical analysis. The recorded value will be + * included in subsequent percentile calculations and distribution snapshot generated by + * {@link #getPercentileHistogramDistribution()}. <br/> + * <br/> + * Values exceeding the maximum trackable limit (specified during histogram construction) are + * automatically rejected with a warning logged. This prevents histogram corruption while + * maintaining recording performance. + * @param value the value to record, must be within the histogram's trackable range + */ + void addValue(long value) { + if (value > maxUtil) { + // Ignoring recording value more than maximum trackable value. + LOGGER.warn("Histogram recording higher value than maximum. Ignoring it."); + return; + } + recorder.recordValue(value); + } + + /** + * Generates an immutable snapshot containing the percentile distribution of values recorded + * since the last call to this method. The snapshot includes statistical metrics such as min/max + * values, operation count, percentile values, and any attached tags. <br/> + * <br/> + * This method captures an interval histogram (values recorded since the previous snapshot) and + * delegates to concrete implementations via {@link #generateDistributionMap(Histogram)} to + * determine which specific percentiles and metrics to include. <br/> + * <br/> + * The returned {@link PercentileHistogramDistribution} is thread-safe and immutable, making it + * suitable for concurrent access by monitoring systems. + * @return an immutable {@link HistogramDistribution} containing percentile analysis and + * statistics + */ + HistogramDistribution getPercentileHistogramDistribution() { + Histogram histogram = this.recorder.getIntervalHistogram(prevHistogram); + HistogramDistribution distribution; + if (tags == null) { + distribution = new PercentileHistogramDistribution(name, histogram.getMinValue(), + histogram.getMaxValue(), histogram.getTotalCount(), + generateDistributionMap(histogram)); + } else { + distribution = new PercentileHistogramDistribution(name, histogram.getMinValue(), + histogram.getMaxValue(), histogram.getTotalCount(), + generateDistributionMap(histogram), tags); + } + this.prevHistogram = histogram; + return distribution; + } + + /** + * Attaches a metadata tag to the histogram as a key-value pair. Tags provide additional context + * about the histogram data and are included in all distribution snapshots generated by + * {@link #getPercentileHistogramDistribution()}. <br/> + * <br/> + * Tags are commonly used for dimensional monitoring, allowing metrics to be filtered and + * grouped by tag names. Multiple tags can be added to the same histogram, and duplicate keys + * will overwrite previous values. + * @param key the tag key + * @param value the tag value + */ + void addTag(String key, String value) { + if (tags == null) { + tags = new HashMap<>(); + } + tags.put(key, value); + } + + /** + * Generates a map of percentile distribution where the key is the percentile name (e.g., + * "_90th_percentile", "_95th_percentile", "_median", etc.) and the value is the actual + * percentile value from the histogram snapshot. This method is for internal use only and is + * called from {@link #getPercentileHistogramDistribution()}. + * @param snapshot the snapshot of the {@link Histogram} + * @return a map of percentile distribution with percentile names as keys and their + * corresponding values + */ + protected abstract Map<String, Long> generateDistributionMap(Histogram snapshot); +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogramDistribution.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogramDistribution.java new file mode 100644 index 0000000000..5c81d9c5da --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/PercentileHistogramDistribution.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import java.util.Collections; +import java.util.Map; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +/** + * <b>External User-Facing API</b> + * <p> + * An immutable snapshot of percentile distribution data captured by a {@link PercentileHistogram}. + * This class provides a consumer-friendly interface to access histogram statistics without exposing + * the underlying {@link org.HdrHistogram.Histogram} instances. <br/> + * <br/> + * This class contains: + * <ul> + * <li>Percentile distribution map with percentile names as keys and their values</li> + * <li>Basic statistics (minimum, maximum, total count)</li> + * <li>Optional tags for additional metadata</li> + * </ul> + * <br/> + * Use {@link PercentileHistogram#getPercentileHistogramDistribution()} to get the percentile + * distribution captured by the histogram. Use {@link #getTags()} to access any tags attached to the + * histogram. <br/> + * <br/> + * All data in this class is immutable after construction, making it safe for concurrent access. + */ +public class PercentileHistogramDistribution extends HistogramDistributionImpl { + private final ImmutableMap<String, Long> percentileDistributionMap; + private ImmutableMap<String, String> tags = null; + + PercentileHistogramDistribution(String histoName, long min, long max, long count, + Map<String, Long> percentileDistributionMap) { + super(histoName, min, max, count, Collections.emptyMap()); + this.percentileDistributionMap = ImmutableMap.copyOf(percentileDistributionMap); + } + + PercentileHistogramDistribution(String histoName, long min, long max, long count, + Map<String, Long> percentileDistributionMap, + Map<String, String> tags) { + this(histoName, min, max, count, percentileDistributionMap); + this.tags = ImmutableMap.copyOf(tags); + } + + /** + * Returns an immutable map containing the percentile distribution and statistical data captured + * by the histogram. The map contains metric names as keys and their corresponding values. <br/> + * <br/> + * The map includes: + * <ul> + * <li>Percentile values (e.g., "_90th_percentile", "_95th_percentile", "_median")</li> + * <li>Statistical metrics (e.g., "_min", "_max", "_num_ops")</li> + * </ul> + * <br/> + * This is the primary method for accessing percentile analysis results. The specific + * percentiles and statistics included depend on the concrete {@link PercentileHistogram} + * implementation that generated this distribution. <br/> + * <br/> + * The returned map is immutable and safe for concurrent access. + * @return an immutable map of metric names to their calculated values + */ + public ImmutableMap<String, Long> getPercentileDistributionMap() { + return percentileDistributionMap; + } + + public Map<String, Long> getRangeDistributionMap() { + throw new UnsupportedOperationException("Range Histogram Distribution is not supported!!"); + } + + /** + * Returns the metadata tags associated with this histogram distribution. Tags provide + * additional context about the histogram data and are commonly used for dimensional monitoring, + * allowing metrics to be filtered and grouped by tag names. <br/> + * <br/> + * Tags are attached to the histogram using {@link PercentileHistogram#addTag(String, String)} + * before generating the distribution snapshot. <br/> + * <br/> + * The returned map is immutable and safe for concurrent access. + * @return an immutable map of tag key-value pairs, or an empty map if no tags were attached + */ + public ImmutableMap<String, String> getTags() { + return tags == null ? ImmutableMap.of() : tags; + } +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/UtilizationHistogram.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/UtilizationHistogram.java new file mode 100644 index 0000000000..bc136fec3f --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/UtilizationHistogram.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import java.util.HashMap; +import java.util.Map; + +import org.HdrHistogram.Histogram; + +/** + * A concrete implementation of {@link PercentileHistogram} specifically designed for tracking + * utilization metrics. This histogram captures and provides a comprehensive set of statistical + * metrics including percentile distributions, operation counts, and min/max values. <br/> + * <br/> + * <p> + * <b>Internal Use Only:</b> This class is for internal use only and should not be used directly by + * users of Phoenix. + * </p> + * <p> + * The histogram generates the following metrics: + * </p> + * <ul> + * <li>Number of operations (total count)</li> + * <li>Minimum and maximum recorded values</li> + * <li>25th, 50th (median), 75th, 90th, and 95th percentiles</li> + * </ul> + */ +class UtilizationHistogram extends PercentileHistogram { + + UtilizationHistogram(long maxUtil, String name) { + super(maxUtil, name); + } + + protected Map<String, Long> generateDistributionMap(Histogram snapshot) { + Map<String, Long> metrics = new HashMap<>(); + metrics.put(NUM_OPS_METRIC_NAME, snapshot.getTotalCount()); + metrics.put(MIN_METRIC_NAME, snapshot.getMinValue()); + metrics.put(MAX_METRIC_NAME, snapshot.getMaxValue()); + metrics.put(TWENTY_FIFTH_PERCENTILE_METRIC_NAME, snapshot.getValueAtPercentile(25)); + metrics.put(MEDIAN_METRIC_NAME, snapshot.getValueAtPercentile(50)); + metrics.put(SEVENTY_FIFTH_PERCENTILE_METRIC_NAME, + snapshot.getValueAtPercentile(75)); + metrics.put(NINETIETH_PERCENTILE_METRIC_NAME, snapshot.getValueAtPercentile(90)); + metrics.put(NINETY_FIFTH_PERCENTILE_METRIC_NAME, snapshot.getValueAtPercentile(95)); + return metrics; + } +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 6161732357..b2c7f3b4e3 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -78,6 +78,7 @@ import static org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_FAIL import static org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_SUCCESS; import static org.apache.phoenix.monitoring.MetricType.TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS; import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_VIEW_TTL_ENABLED; @@ -144,6 +145,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.regex.Pattern; import javax.annotation.concurrent.GuardedBy; @@ -243,14 +245,18 @@ import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.iterate.TableResultIterator; import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus; +import org.apache.phoenix.jdbc.AbstractRPCConnectionInfo; import org.apache.phoenix.jdbc.ConnectionInfo; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.RPCConnectionInfo; +import org.apache.phoenix.jdbc.ZKConnectionInfo; +import org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats; import org.apache.phoenix.log.ConnectionLimiter; import org.apache.phoenix.log.DefaultConnectionLimiter; import org.apache.phoenix.log.LoggingConnectionLimiter; import org.apache.phoenix.log.QueryLoggerDisruptor; +import org.apache.phoenix.monitoring.HTableThreadPoolHistograms; import org.apache.phoenix.monitoring.TableMetricsManager; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; @@ -511,16 +517,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // Based on implementations used in // org.apache.hadoop.hbase.client.ConnectionImplementation final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(maxQueue); + Supplier<HTableThreadPoolHistograms> hTableThreadPoolHistogramsSupplier = + getThreadPoolHistogramsSupplier(maxThreads, maxQueue); this.threadPoolExecutor = - new ThreadPoolExecutor(corePoolSize, maxThreads, keepAlive, TimeUnit.SECONDS, - workQueue, new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("CQSI-" + threadPoolName - + "-" + threadPoolNumber.incrementAndGet() - + "-shared-pool-%d") - .setUncaughtExceptionHandler( - Threads.LOGGING_EXCEPTION_HANDLER) - .build()); + new HTableThreadPoolWithUtilizationStats(corePoolSize, maxThreads, keepAlive, + TimeUnit.SECONDS, workQueue, new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("CQSI-" + threadPoolName + + "-" + threadPoolNumber.incrementAndGet() + + "-shared-pool-%d") + .setUncaughtExceptionHandler( + Threads.LOGGING_EXCEPTION_HANDLER) + .build(), connectionInfo.toUrl(), hTableThreadPoolHistogramsSupplier); this.threadPoolExecutor.allowCoreThreadTimeOut(finalConfig .getBoolean(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT)); @@ -642,6 +650,34 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement .getMetadataCachingSource(); } + private Supplier<HTableThreadPoolHistograms> getThreadPoolHistogramsSupplier( + int maxThreadPoolSize, int maxQueueSize) { + if (this.config.getBoolean(CQSI_THREAD_POOL_METRICS_ENABLED, + DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED)) { + return new Supplier<HTableThreadPoolHistograms>() { + @Override + public HTableThreadPoolHistograms get() { + HTableThreadPoolHistograms hTableThreadPoolHistograms = + new HTableThreadPoolHistograms(maxThreadPoolSize, maxQueueSize); + if (connectionInfo instanceof ZKConnectionInfo) { + hTableThreadPoolHistograms.addServerTag( + ((ZKConnectionInfo) connectionInfo).getZkHosts()); + } else if (connectionInfo instanceof AbstractRPCConnectionInfo) { + hTableThreadPoolHistograms.addServerTag( + ((AbstractRPCConnectionInfo) connectionInfo).getBoostrapServers()); + } else { + throw new IllegalStateException("Unexpected connection info type!!"); + } + String cqsiName = connectionInfo.getPrincipal(); + hTableThreadPoolHistograms.addCqsiNameTag(cqsiName != null + ? cqsiName : DEFAULT_QUERY_SERVICES_NAME); + return hTableThreadPoolHistograms; + } + }; + } + return null; + } + private Connection openConnection(Configuration conf) throws SQLException { Connection localConnection; try { diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index dc0fab5a2c..e16e716958 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -510,6 +510,8 @@ public interface QueryServices extends SQLCloseable { */ String TESTS_MINI_CLUSTER_NUM_REGION_SERVERS = "phoenix.tests.minicluster.numregionservers"; + String TESTS_MINI_CLUSTER_NUM_MASTERS = "phoenix.tests.minicluster.nummasters"; + /** * Config to inject any processing after the client retrieves dummy result from the server. @@ -553,6 +555,8 @@ public interface QueryServices extends SQLCloseable { String PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT = "phoenix.streams.get.table.regions.timeout"; + String CQSI_THREAD_POOL_METRICS_ENABLED = "phoenix.cqsi.thread.pool.metrics.enabled"; + /** * Get executor service used for parallel scans */ diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index d7da58f3c2..bb5ef67e49 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -36,6 +36,7 @@ import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_ME import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_ENABLED; import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_PUBLISHER_CLASSNAME; import static org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_METRICS_ENABLED; import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB; import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB; import static org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK; @@ -467,6 +468,7 @@ public class QueryServicesOptions { public static final int DEFAULT_CQSI_THREAD_POOL_MAX_THREADS = 25; public static final int DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE = 512; public static final Boolean DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = true; + public static final Boolean DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED = false; private final Configuration config; @@ -582,7 +584,8 @@ public class QueryServicesOptions { .setIfUnset(CQSI_THREAD_POOL_MAX_THREADS, DEFAULT_CQSI_THREAD_POOL_MAX_THREADS) .setIfUnset(CQSI_THREAD_POOL_MAX_QUEUE, DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE) .setIfUnset(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, - DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT); + DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT) + .setIfUnset(CQSI_THREAD_POOL_METRICS_ENABLED, DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED); // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set @@ -823,6 +826,15 @@ public class QueryServicesOptions { .getBoolean(TABLE_LEVEL_METRICS_ENABLED, DEFAULT_IS_TABLE_LEVEL_METRICS_ENABLED); } + public boolean isCQSIThreadPoolMetricsEnabled() { + return config.getBoolean(CQSI_THREAD_POOL_METRICS_ENABLED, + DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED); + } + + public void setCQSIThreadPoolMetricsEnabled(boolean enabled) { + config.setBoolean(CQSI_THREAD_POOL_METRICS_ENABLED, enabled); + } + public void setTableLevelMetricsEnabled() { set(TABLE_LEVEL_METRICS_ENABLED, true); } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index 88ad2e36ef..b5f98ce457 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Pair; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.compile.QueryPlan; -import org.apache.phoenix.coprocessorclient.MetaDataProtocol; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.OrderByExpression; @@ -67,12 +66,12 @@ import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric; import org.apache.phoenix.monitoring.GlobalClientMetrics; import org.apache.phoenix.monitoring.GlobalMetric; +import org.apache.phoenix.monitoring.HTableThreadPoolMetricsManager; import org.apache.phoenix.monitoring.HistogramDistribution; import org.apache.phoenix.monitoring.MetricType; import org.apache.phoenix.monitoring.PhoenixTableMetric; import org.apache.phoenix.monitoring.TableMetricsManager; import org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesMetricsManager; -import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.AmbiguousColumnException; @@ -1417,6 +1416,53 @@ public class PhoenixRuntime { return TableMetricsManager.getSizeHistogramsForAllTables(); } + /** + * Retrieves comprehensive HTable thread pool utilization and contention metrics collected + * across all monitored HTable thread pools. + * <p> + * This method provides access to detailed performance histograms that track two critical thread + * pool metrics: + * </p> + * <ul> + * <li><b>Active Threads Count</b> - Distribution of the number of threads actively executing + * tasks</li> + * <li><b>Queue Size</b> - Distribution of the number of tasks waiting in thread pool + * queues</li> + * </ul> + * <p> + * The metrics are automatically collected by + * {@link org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats}, a specialized + * ThreadPoolExecutor that instruments HTable operations. These statistics help identify thread + * pool bottlenecks, optimize thread pool configurations, and monitor overall HTable performance + * characteristics. + * </p> + * <p> + * <b>Metric Sources:</b> + * </p> + * <ul> + * <li><b>CQSI Thread Pools</b> - Internal Phoenix CQSI-level thread pools (identified by + * connection URL)</li> + * <li><b>External Thread Pools</b> - User-defined HTable thread pools created with + * {@link org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats}</li> + * </ul> + * <p> + * Each histogram includes percentile distributions (P50, P90, P95, P99, etc.), min/max values, + * and operation counts. The returned map is keyed by histogram identifier and contains + * dimensional tags for enhanced monitoring capabilities. + * </p> + * @return a map where keys are histogram identifiers (connection URLs for CQSI pools, or custom + * names for external pools) and values are lists of {@link HistogramDistribution} + * instances containing comprehensive utilization statistics. Returns an empty map if no + * HTable thread pools have been monitored or if metrics collection is disabled. + * @see org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats + * @see HTableThreadPoolMetricsManager + * @see org.apache.phoenix.monitoring.HTableThreadPoolHistograms + * @see HistogramDistribution + */ + public static Map<String, List<HistogramDistribution>> getHTableThreadPoolHistograms() { + return HTableThreadPoolMetricsManager.getHistogramsForAllThreadPools(); + } + public static Map<String, List<HistogramDistribution>> getAllConnectionQueryServicesHistograms() { return ConnectionQueryServicesMetricsManager.getHistogramsForAllConnectionQueryServices(); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java index e3a805c77b..32d011cd9c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionIT.java @@ -42,12 +42,14 @@ import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED; import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS; import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_QUEUE; import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_MAX_THREADS; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_METRICS_ENABLED; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; +import static org.mockito.Mockito.doAnswer; import java.lang.reflect.Field; import java.sql.Connection; @@ -58,6 +60,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.lang3.RandomStringUtils; @@ -71,13 +74,15 @@ import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilit import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.log.LogLevel; import org.apache.phoenix.monitoring.GlobalMetric; +import org.apache.phoenix.monitoring.HTableThreadPoolHistograms; +import org.apache.phoenix.monitoring.HTableThreadPoolMetricsManager; +import org.apache.phoenix.monitoring.HistogramDistribution; import org.apache.phoenix.monitoring.MetricType; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.ConnectionQueryServicesImpl; -import org.apache.phoenix.query.HBaseFactoryProvider; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.util.JDBCUtil; import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -86,8 +91,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,8 +129,9 @@ public class ParallelPhoenixConnectionIT { GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_CORE_POOL_SIZE, String.valueOf(17)); GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_MAX_THREADS, String.valueOf(19)); GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_MAX_QUEUE, String.valueOf(23)); - GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, String.valueOf(true)); - + GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, + String.valueOf(true)); + GLOBAL_PROPERTIES.setProperty(CQSI_THREAD_POOL_METRICS_ENABLED, String.valueOf(true)); } @AfterClass @@ -226,6 +231,107 @@ public class ParallelPhoenixConnectionIT { } } + @Test + public void testCqsiThreadPoolMetricsForParallelConnection() throws Exception { + try (Connection conn = getParallelConnection()) { + ParallelPhoenixConnection pr = conn.unwrap(ParallelPhoenixConnection.class); + + // Get details of connection#1 + PhoenixConnection pConn1 = pr.getFutureConnection1().get(); + Configuration config1 = pConn1.getQueryServices().getConfiguration(); + String zkQuorum1 = config1.get(HConstants.ZOOKEEPER_QUORUM); + String principal1 = config1.get(QueryServices.QUERY_SERVICES_NAME); + + // Get details of connection#2 + PhoenixConnection pConn2 = pr.getFutureConnection2().get(); + Configuration config2 = pConn2.getQueryServices().getConfiguration(); + String zkQuorum2 = config2.get(HConstants.ZOOKEEPER_QUORUM); + String principal2 = config2.get(QueryServices.QUERY_SERVICES_NAME); + + // Slow down connection#1 + CountDownLatch latch = new CountDownLatch(1); + slowDownConnection(pr, pr.getFutureConnection1(), "futureConnection1", latch); + + try (Statement stmt = conn.createStatement()) { + HTableThreadPoolMetricsManager.getHistogramsForAllThreadPools(); + try (ResultSet rs = + stmt.executeQuery(String.format("SELECT COUNT(*) FROM %s", tableName))) { + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + assertFalse(rs.next()); + } + + Map<String, List<HistogramDistribution>> htableHistograms = + HTableThreadPoolMetricsManager.getHistogramsForAllThreadPools(); + + // Assert connection#1 CQSI thread pool metrics + String conn1HistogramKey = getHistogramKey(config1); + assertHTableThreadPoolHistograms(htableHistograms.get(conn1HistogramKey), + conn1HistogramKey, false, zkQuorum1, principal1); + + // Assert connection#2 CQSI thread pool metrics + String conn2HistogramKey = getHistogramKey(config2); + assertHTableThreadPoolHistograms(htableHistograms.get(conn2HistogramKey), + conn2HistogramKey, true, zkQuorum2, principal2); + + // Assert that the CQSI thread pool metrics for both connections are different + Assert.assertNotEquals(conn1HistogramKey, conn2HistogramKey); + } finally { + latch.countDown(); + } + } + } + + private void slowDownConnection(ParallelPhoenixConnection pr, + CompletableFuture<PhoenixConnection> pConn, String futureConnectionField, + CountDownLatch latch) throws Exception { + Assert.assertTrue(futureConnectionField.equals("futureConnection1") + || futureConnectionField.equals("futureConnection2")); + + PhoenixConnection spy = Mockito.spy(pConn.get()); + doAnswer((invocation) -> { + // Block the statement creation until the latch is counted down + latch.await(); + return invocation.callRealMethod(); + }).when(spy).createStatement(); + + // Replace the existing CompletableFuture with the spied CompletableFuture + Field futureField = ParallelPhoenixConnection.class.getDeclaredField(futureConnectionField); + futureField.setAccessible(true); + CompletableFuture<PhoenixConnection> spiedFuture = CompletableFuture.completedFuture(spy); + futureField.set(pr, spiedFuture); + + // Verify that the spied CompletableFuture has been setup correctly + if (futureConnectionField.equals("futureConnection1")) { + Assert.assertSame(spy, pr.getFutureConnection1().get()); + } else { + Assert.assertSame(spy, pr.getFutureConnection2().get()); + } + } + + private String getHistogramKey(Configuration config) throws SQLException { + String url = + QueryUtil.getConnectionUrl(clientProperties, config, HBaseTestingUtilityPair.PRINCIPAL); + return ConnectionInfo.createNoLogin(url, null, null).toUrl(); + } + + private void assertHTableThreadPoolHistograms(List<HistogramDistribution> histograms, + String histogramKey, boolean isUsed, String zkQuorum, String principal) { + Assert.assertNotNull(histograms); + assertEquals(2, histograms.size()); + for (HistogramDistribution histogram : histograms) { + if (isUsed) { + assertTrue(histogram.getCount() > 0); + } else { + assertEquals(0, histogram.getCount()); + } + assertEquals(zkQuorum, + histogram.getTags().get(HTableThreadPoolHistograms.Tag.servers.name())); + assertEquals(principal, + histogram.getTags().get(HTableThreadPoolHistograms.Tag.cqsiName.name())); + } + } + /** * Test Phoenix connection creation and basic operations with HBase cluster(s) unavailable. */ diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BaseHTableThreadPoolMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BaseHTableThreadPoolMetricsIT.java new file mode 100644 index 0000000000..0d73dabdda --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BaseHTableThreadPoolMetricsIT.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.util.PhoenixRuntime; +import org.junit.Assert; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.Map; + +import static org.apache.phoenix.monitoring.HTableThreadPoolHistograms.HistogramName; + +public class BaseHTableThreadPoolMetricsIT extends BaseTest { + private static final int ROWS_TO_LOAD_INITIALLY = 10000; + + protected void assertHistogramTags( + Map<String, List<HistogramDistribution>> htableThreadPoolHistograms, + Map<String, String> expectedTagKeyValues, String histogramKey) throws SQLException { + List<HistogramDistribution> histograms = htableThreadPoolHistograms.get(histogramKey); + Assert.assertEquals(2, histograms.size()); + for (HistogramDistribution histogram : histograms) { + Map<String, String> tags = histogram.getTags(); + + // Assert that there only expected tag names are there + Assert.assertEquals(expectedTagKeyValues.keySet(), tags.keySet()); + + for (Map.Entry<String, String> tag : tags.entrySet()) { + String tagName = tag.getKey(); + Assert.assertEquals(expectedTagKeyValues.get(tagName), tags.get(tagName)); + } + } + } + + protected void assertHTableThreadPoolUsed( + Map<String, List<HistogramDistribution>> htableThreadPoolHistograms, String histogramKey) { + boolean foundActiveThreadsCountHistogram = false; + boolean foundQueueSizeHistogram = false; + + List<HistogramDistribution> histograms = htableThreadPoolHistograms.get(histogramKey); + // Assert each HTableThreadPoolHistograms has 2 HdrHistograms + Assert.assertEquals(2, histograms.size()); + for (HistogramDistribution histogram : histograms) { + if (histogram.getHistoName().equals(HistogramName.ActiveThreadsCount.name())) { + + // Assert that at least row count no. of requests were processed by + // HTableThreadPool as we are fetching 1 row per scan RPC + Assert.assertTrue(ROWS_TO_LOAD_INITIALLY + <= histogram.getPercentileDistributionMap().get( + PercentileHistogram.NUM_OPS_METRIC_NAME).intValue()); + foundActiveThreadsCountHistogram = true; + } + else if (histogram.getHistoName().equals(HistogramName.QueueSize.name())) { + foundQueueSizeHistogram = true; + } + } + + // Assert that the HTableThreadPool expected to be used was actually used + Assert.assertTrue(foundActiveThreadsCountHistogram); + Assert.assertTrue(foundQueueSizeHistogram); + } + + protected void assertHTableThreadPoolNotUsed( + Map<String, List<HistogramDistribution>> htableThreadPoolHistograms, + String histogramKey) { + boolean foundActiveThreadsCountHistogram = false; + boolean foundQueueSizeHistogram = false; + List<HistogramDistribution> histograms = htableThreadPoolHistograms.get(histogramKey); + Assert.assertEquals(2, histograms.size()); + for (HistogramDistribution histogram : histograms) { + if (histogram.getHistoName().equals(HistogramName.ActiveThreadsCount.name())) { + foundActiveThreadsCountHistogram = true; + } + else if (histogram.getHistoName().equals(HistogramName.QueueSize.name())) { + foundQueueSizeHistogram = true; + } + Assert.assertFalse(histogram.getPercentileDistributionMap().isEmpty()); + for (long value: histogram.getPercentileDistributionMap().values()) { + Assert.assertEquals(0, value); + } + } + Assert.assertTrue(foundActiveThreadsCountHistogram); + Assert.assertTrue(foundQueueSizeHistogram); + } + + protected Map<String, List<HistogramDistribution>> runQueryAndGetHistograms(Connection conn, + String tableName) + throws SQLException { + Map<String, List<HistogramDistribution>> htableThreadPoolHistograms; + + try (Statement stmt = conn.createStatement()) { + // Per row submit one task for execution in HTable thread pool + stmt.setFetchSize(1); + try (ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName)) { + int rowsRead = 0; + // Reset the histograms + PhoenixRuntime.getHTableThreadPoolHistograms(); + while (rs.next()) { + rowsRead++; + } + Assert.assertEquals(ROWS_TO_LOAD_INITIALLY, rowsRead); + htableThreadPoolHistograms = PhoenixRuntime.getHTableThreadPoolHistograms(); + } + } + return htableThreadPoolHistograms; + } + + protected void createTableAndUpsertData(Connection conn, String tableName) throws SQLException { + try (Statement stmt = conn.createStatement()) { + stmt.execute("CREATE TABLE " + tableName + " (k VARCHAR NOT NULL PRIMARY KEY, v " + + "VARCHAR)"); + } + try (PreparedStatement stmt =conn.prepareStatement("UPSERT INTO " + tableName + + " VALUES (?, ?)")) { + for (int i = 1; i <= ROWS_TO_LOAD_INITIALLY; i++) { + stmt.setString(1, "k" + i); + stmt.setString(2, "v" + i); + stmt.executeUpdate(); + } + conn.commit(); + } + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CQSIThreadPoolMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CQSIThreadPoolMetricsIT.java new file mode 100644 index 0000000000..7f69578d24 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CQSIThreadPoolMetricsIT.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.jdbc.AbstractRPCConnectionInfo; +import org.apache.phoenix.jdbc.ConnectionInfo; +import org.apache.phoenix.jdbc.ZKConnectionInfo; +import org.apache.phoenix.query.ConfigurationFactory; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.InstanceResolver; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import static org.apache.phoenix.jdbc.ConnectionInfo.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_ENABLED; +import static org.apache.phoenix.query.QueryServices.CQSI_THREAD_POOL_METRICS_ENABLED; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_QUERY_SERVICES_NAME; + +@Category(NeedsOwnMiniClusterTest.class) +@RunWith(Parameterized.class) +public class CQSIThreadPoolMetricsIT extends BaseHTableThreadPoolMetricsIT { + + private final String registryClassName; + private final Properties props = new Properties(); + + public CQSIThreadPoolMetricsIT(String registryClassName) { + this.registryClassName = registryClassName; + } + + @BeforeClass + public static void setUp() throws Exception { + final Configuration conf = HBaseConfiguration.create(); + setUpConfigForMiniCluster(conf); + conf.setBoolean(CQSI_THREAD_POOL_METRICS_ENABLED, true); + conf.setBoolean(CQSI_THREAD_POOL_ENABLED, true); + + InstanceResolver.clearSingletons(); + // Override to get required config for static fields loaded that require HBase config + InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() { + + @Override public Configuration getConfiguration() { + return conf; + } + + @Override public Configuration getConfiguration(Configuration confToClone) { + Configuration copy = new Configuration(conf); + copy.addResource(confToClone); + return copy; + } + }); + + Map<String, String> props = new HashMap<>(); + props.put(QueryServices.TESTS_MINI_CLUSTER_NUM_MASTERS, "2"); + setUpTestDriver(new ReadOnlyProps(props)); + } + + @Before + public void testCaseSetup() { + props.setProperty(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryClassName); + } + + @After + public void cleanup() throws Exception { + driver.cleanUpCQSICache(); + HTableThreadPoolMetricsManager.clearHTableThreadPoolHistograms(); + props.clear(); + } + + @Parameterized.Parameters(name = "ExternalHTableThreadPoolMetricsIT_registryClassName={0}") + public synchronized static Collection<String> data() { + return Arrays.asList(ZKConnectionInfo.ZK_REGISTRY_NAME, + "org.apache.hadoop.hbase.client.RpcConnectionRegistry", + "org.apache.hadoop.hbase.client.MasterRegistry"); + } + + @Test + public void testHistogramsPerConnInfo() throws Exception { + String tableName = generateUniqueName(); + String histogramKey; + String cqsiNameService1 = "service1"; + String cqsiNameService2 = "service2"; + + // Create a connection for "service1" connection profile + String url = QueryUtil.getConnectionUrl(props, utility.getConfiguration(), + cqsiNameService1); + Map<String, List<HistogramDistribution>> htableThreadPoolHistograms; + try (Connection conn = driver.connect(url, props)) { + createTableAndUpsertData(conn, tableName); + + htableThreadPoolHistograms = runQueryAndGetHistograms(conn, tableName); + + histogramKey = getHistogramKey(url); + assertHTableThreadPoolUsed(htableThreadPoolHistograms, histogramKey); + Map<String, String> expectedTagKeyValues = getExpectedTagKeyValues(url, + cqsiNameService1); + assertHistogramTags(htableThreadPoolHistograms, expectedTagKeyValues, histogramKey); + } + + // Create a connection for "service2" connection profile + url = QueryUtil.getConnectionUrl(props, utility.getConfiguration(), cqsiNameService2); + htableThreadPoolHistograms = PhoenixRuntime.getHTableThreadPoolHistograms(); + // Assert that HTableThreadPoolHistograms for service2 is not there yet + Assert.assertNull(htableThreadPoolHistograms.get(getHistogramKey(url))); + try (Connection conn = driver.connect(url, props)) { + htableThreadPoolHistograms = runQueryAndGetHistograms(conn, tableName); + + assertHTableThreadPoolNotUsed(htableThreadPoolHistograms, histogramKey); + Map<String, String> expectedTagKeyValues = getExpectedTagKeyValues(url, + cqsiNameService1); + assertHistogramTags(htableThreadPoolHistograms, expectedTagKeyValues, histogramKey); + + histogramKey = getHistogramKey(url); + // We have HTableThreadPoolHistograms for service1 and service2 CQSI instances + assertHTableThreadPoolUsed(htableThreadPoolHistograms, histogramKey); + expectedTagKeyValues = getExpectedTagKeyValues(url, + cqsiNameService2); + assertHistogramTags(htableThreadPoolHistograms, expectedTagKeyValues, histogramKey); + } + } + + @Test + public void testCQSIThreadPoolHistogramsDisabled() throws Exception { + String tableName = generateUniqueName(); + String cqsiName = "service1"; + props.setProperty(CQSI_THREAD_POOL_METRICS_ENABLED, "false"); + props.setProperty(CQSI_THREAD_POOL_ENABLED, "true"); + String url = QueryUtil.getConnectionUrl(props, utility.getConfiguration(), cqsiName); + try (Connection conn = driver.connect(url, props)) { + createTableAndUpsertData(conn, tableName); + + Map<String, List<HistogramDistribution>> htableThreadPoolHistograms = + runQueryAndGetHistograms(conn, tableName); + String histogramKey = getHistogramKey(url); + Assert.assertNull(htableThreadPoolHistograms.get(histogramKey)); + } + } + + @Test + public void testDefaultCQSIHistograms() throws Exception { + String tableName = generateUniqueName(); + + String url = QueryUtil.getConnectionUrl(props, utility.getConfiguration()); + Map<String, List<HistogramDistribution>> htableThreadPoolHistograms; + try (Connection conn = driver.connect(url, props)) { + createTableAndUpsertData(conn, tableName); + + htableThreadPoolHistograms = runQueryAndGetHistograms(conn, tableName); + + String histogramKey = getHistogramKey(url); + assertHTableThreadPoolUsed(htableThreadPoolHistograms, histogramKey); + Map<String, String> expectedTagKeyValues = getExpectedTagKeyValues(url, + DEFAULT_QUERY_SERVICES_NAME); + assertHistogramTags(htableThreadPoolHistograms, expectedTagKeyValues, histogramKey); + } + } + + @Test + public void testThreadPoolHistogramsSharedAcrossCQSIWithSameConnInfo() throws Exception { + String tableName = generateUniqueName(); + String histogramKey; + String cqsiName = "service1"; + + // Create a connection for "service1" connection profile + String url = QueryUtil.getConnectionUrl(props, utility.getConfiguration(), cqsiName); + Map<String, List<HistogramDistribution>> htableThreadPoolHistograms; + try (Connection conn = driver.connect(url, props)) { + createTableAndUpsertData(conn, tableName); + + htableThreadPoolHistograms = runQueryAndGetHistograms(conn, tableName); + + histogramKey = getHistogramKey(url); + assertHTableThreadPoolUsed(htableThreadPoolHistograms, histogramKey); + Map<String, String> expectedTagKeyValues = getExpectedTagKeyValues(url, cqsiName); + assertHistogramTags(htableThreadPoolHistograms, expectedTagKeyValues, histogramKey); + } + + driver.cleanUpCQSICache(); + try (Connection conn = driver.connect(url, props)) { + htableThreadPoolHistograms = runQueryAndGetHistograms(conn, tableName); + // Assert that no new HTableThreadPoolHistograms instance was created for a new CQSI + // instance + String histogramKeyForDefaultCQSI = getHistogramKey(QueryUtil.getConnectionUrl( + new Properties(), utility.getConfiguration())); + Set<String> histogramKeySet = + new HashSet<>(Arrays.asList(histogramKeyForDefaultCQSI, histogramKey)); + Assert.assertTrue(histogramKeySet.containsAll(htableThreadPoolHistograms.keySet())); + assertHTableThreadPoolUsed(htableThreadPoolHistograms, histogramKey); + Map<String, String> expectedTagKeyValues = getExpectedTagKeyValues(url, cqsiName); + assertHistogramTags(htableThreadPoolHistograms, expectedTagKeyValues, histogramKey); + } + } + + private String getHistogramKey(String url) throws SQLException { + return ConnectionInfo.createNoLogin(url, null, null).toUrl(); + } + + private Map<String, String> getExpectedTagKeyValues(String url, String cqsiName) + throws SQLException { + Map<String, String> expectedTagKeyValues = new HashMap<>(); + ConnectionInfo connInfo = ConnectionInfo.createNoLogin(url, null, null); + if (registryClassName.equals(ZKConnectionInfo.ZK_REGISTRY_NAME)) { + expectedTagKeyValues.put(HTableThreadPoolHistograms.Tag.servers.name(), + ((ZKConnectionInfo) connInfo).getZkHosts()); + } + else { + expectedTagKeyValues.put(HTableThreadPoolHistograms.Tag.servers.name(), + ((AbstractRPCConnectionInfo) connInfo).getBoostrapServers()); + } + expectedTagKeyValues.put(HTableThreadPoolHistograms.Tag.cqsiName.name(), cqsiName); + return expectedTagKeyValues; + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/ExternalHTableThreadPoolMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/ExternalHTableThreadPoolMetricsIT.java new file mode 100644 index 0000000000..8a043b5572 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/ExternalHTableThreadPoolMetricsIT.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.jdbc.ZKConnectionInfo; +import org.apache.phoenix.job.HTableThreadPoolWithUtilizationStats; +import org.apache.phoenix.query.ConfigurationFactory; +import org.apache.phoenix.query.HTableFactory; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.phoenix.util.InstanceResolver; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.sql.Connection; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static org.apache.phoenix.jdbc.ConnectionInfo.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY; + +@Category(NeedsOwnMiniClusterTest.class) +@RunWith(Parameterized.class) +public class ExternalHTableThreadPoolMetricsIT extends BaseHTableThreadPoolMetricsIT{ + private static final int MAX_THREADS_IN_EXTERNAL_THREAD_POOL = 10; + private static final int QUEUE_CAPACITY_OF_EXTERNAL_THREAD_POOL = 100; + private static final String TAG_NAME = "cluster"; + private static final Map<String, String> tagValues = new HashMap<>(); + private static final String THREAD_POOL_1A = "external_thread_pool_1".toUpperCase(); + private static final String THREAD_POOL_2A = "external_thread_pool_2".toUpperCase(); + private static final String HISTOGRAM_DISABLED_THREAD_POOL = + "histogram_disabled_thread_pool".toUpperCase(); + private static final String NULL_SUPPLIER_THREAD_POOL = "null_supplier_thread_pool".toUpperCase(); + private static final String NO_TAGS_THREAD_POOL = "no_tags_thread_pool".toUpperCase(); + + private final String registryClassName; + private final Properties props = new Properties(); + + public ExternalHTableThreadPoolMetricsIT(String registryClassName) { + this.registryClassName = registryClassName; + } + + private static ThreadPoolExecutor createThreadPoolExecutor(String threadPoolName) { + Supplier<HTableThreadPoolHistograms> supplier = + getHTableThreadPoolHistogramsSupplier(threadPoolName); + BlockingQueue<Runnable> workQueue = + new LinkedBlockingQueue<>(QUEUE_CAPACITY_OF_EXTERNAL_THREAD_POOL); + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat(threadPoolName + "-shared-pool-%d") + .setUncaughtExceptionHandler( + Threads.LOGGING_EXCEPTION_HANDLER) + .build(); + return new HTableThreadPoolWithUtilizationStats( + MAX_THREADS_IN_EXTERNAL_THREAD_POOL, MAX_THREADS_IN_EXTERNAL_THREAD_POOL, + 30, TimeUnit.SECONDS, workQueue, threadFactory, + threadPoolName, supplier) { + @Override + public void execute(Runnable command) { + super.execute(command); + } + }; + } + + private static Supplier<HTableThreadPoolHistograms> getHTableThreadPoolHistogramsSupplier( + String threadPoolName) { + Supplier<HTableThreadPoolHistograms> supplier; + if (threadPoolName.equals(HISTOGRAM_DISABLED_THREAD_POOL)) { + supplier = null; + } + else if (threadPoolName.equals(NULL_SUPPLIER_THREAD_POOL)) { + supplier = new Supplier<HTableThreadPoolHistograms>() { + @Override + public HTableThreadPoolHistograms get() { + return null; + } + }; + } + else if (threadPoolName.equals(NO_TAGS_THREAD_POOL)) { + supplier = new Supplier<HTableThreadPoolHistograms>() { + @Override + public HTableThreadPoolHistograms get() { + return new HTableThreadPoolHistograms(MAX_THREADS_IN_EXTERNAL_THREAD_POOL, + QUEUE_CAPACITY_OF_EXTERNAL_THREAD_POOL); + } + }; + } + else { + supplier = new Supplier<HTableThreadPoolHistograms>() { + @Override + public HTableThreadPoolHistograms get() { + HTableThreadPoolHistograms hTableThreadPoolHistograms = + new HTableThreadPoolHistograms(MAX_THREADS_IN_EXTERNAL_THREAD_POOL, + QUEUE_CAPACITY_OF_EXTERNAL_THREAD_POOL); + hTableThreadPoolHistograms.addTag(TAG_NAME, tagValues.get(threadPoolName)); + return hTableThreadPoolHistograms; + } + }; + } + return supplier; + } + + @BeforeClass + public static void setUp() throws Exception { + tagValues.put(THREAD_POOL_1A, "hbase1a"); + tagValues.put(THREAD_POOL_2A, "hbase2a"); + + final Configuration conf = HBaseConfiguration.create(); + setUpConfigForMiniCluster(conf); + + InstanceResolver.clearSingletons(); + // Override to get required config for static fields loaded that require HBase config + InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() { + + @Override public Configuration getConfiguration() { + return conf; + } + + @Override public Configuration getConfiguration(Configuration confToClone) { + Configuration copy = new Configuration(conf); + copy.addResource(confToClone); + return copy; + } + }); + ThreadPoolExecutor executorFor1a = createThreadPoolExecutor(THREAD_POOL_1A); + ThreadPoolExecutor executorFor2a = createThreadPoolExecutor(THREAD_POOL_2A); + ThreadPoolExecutor histogramDisabledExecutor = + createThreadPoolExecutor(HISTOGRAM_DISABLED_THREAD_POOL); + ThreadPoolExecutor nullSupplierExecutor = + createThreadPoolExecutor(NULL_SUPPLIER_THREAD_POOL); + ThreadPoolExecutor defaultExecutor = createThreadPoolExecutor(NO_TAGS_THREAD_POOL); + InstanceResolver.getSingleton(HTableFactory.class, new HTableFactory.HTableFactoryImpl() { + @Override + public Table getTable(byte[] tableName, + org.apache.hadoop.hbase.client.Connection connection, + ExecutorService pool) throws IOException { + if (Bytes.toString(tableName).startsWith(HISTOGRAM_DISABLED_THREAD_POOL)) { + return super.getTable(tableName, connection, histogramDisabledExecutor); + } + else if (Bytes.toString(tableName).startsWith(NULL_SUPPLIER_THREAD_POOL)) { + return super.getTable(tableName, connection, nullSupplierExecutor); + } + else if (Bytes.toString(tableName).startsWith(THREAD_POOL_1A)) { + return super.getTable(tableName, connection, executorFor1a); + } + else if (Bytes.toString(tableName).startsWith(THREAD_POOL_2A)) { + return super.getTable(tableName, connection, executorFor2a); + } + else { + return super.getTable(tableName, connection, defaultExecutor); + } + } + }); + + Map<String, String> props = new HashMap<>(); + props.put(QueryServices.TESTS_MINI_CLUSTER_NUM_MASTERS, "2"); + setUpTestDriver(new ReadOnlyProps(props)); + } + + @Before + public void testCaseSetup() { + props.setProperty(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryClassName); + } + + @After + public void cleanup() throws Exception { + HTableThreadPoolMetricsManager.clearHTableThreadPoolHistograms(); + props.clear(); + } + + @Parameterized.Parameters(name = "ExternalHTableThreadPoolMetricsIT_registryClassName={0}") + public synchronized static Collection<String> data() { + return Arrays.asList(ZKConnectionInfo.ZK_REGISTRY_NAME, + "org.apache.hadoop.hbase.client.RpcConnectionRegistry", + "org.apache.hadoop.hbase.client.MasterRegistry"); + } + + @Test + public void testHistogramsPerHTableThreadPool() throws Exception { + String tableName = THREAD_POOL_1A + "." + generateUniqueName(); + + Map<String, List<HistogramDistribution>> htableThreadPoolHistograms; + String url = QueryUtil.getConnectionUrl(props, utility.getConfiguration()); + + // Send traffic to HTable thread pool for hbase1a + try (Connection conn = driver.connect(url, props)) { + createTableAndUpsertData(conn, tableName); + + htableThreadPoolHistograms = runQueryAndGetHistograms(conn, tableName); + assertHTableThreadPoolUsed(htableThreadPoolHistograms, THREAD_POOL_1A); + Assert.assertNull(htableThreadPoolHistograms.get(THREAD_POOL_2A)); + + Map<String, String> expectedTagKeyValues = new HashMap<>(); + expectedTagKeyValues.put(TAG_NAME, tagValues.get(THREAD_POOL_1A)); + assertHistogramTags(htableThreadPoolHistograms, expectedTagKeyValues, THREAD_POOL_1A); + } + + // Send traffic to HTable thread pool for hbase2a + tableName = THREAD_POOL_2A + "." + generateUniqueName(); + try (Connection conn = driver.connect(url, props)) { + createTableAndUpsertData(conn, tableName); + + htableThreadPoolHistograms = runQueryAndGetHistograms(conn, tableName); + + // We will have a HTable thread pool for hbase1a also + assertHTableThreadPoolUsed(htableThreadPoolHistograms, THREAD_POOL_2A); + assertHTableThreadPoolNotUsed(htableThreadPoolHistograms, THREAD_POOL_1A); + + Map<String, String> expectedTagKeyValues = new HashMap<>(); + expectedTagKeyValues.put(TAG_NAME, tagValues.get(THREAD_POOL_1A)); + assertHistogramTags(htableThreadPoolHistograms, expectedTagKeyValues, THREAD_POOL_1A); + + expectedTagKeyValues.put(TAG_NAME, tagValues.get(THREAD_POOL_2A)); + assertHistogramTags(htableThreadPoolHistograms, expectedTagKeyValues, THREAD_POOL_2A); + } + } + + @Test + public void testHistogramDisabled() throws Exception { + String tableName = HISTOGRAM_DISABLED_THREAD_POOL + "." + generateUniqueName(); + + Map<String, List<HistogramDistribution>> htableThreadPoolHistograms; + String url = QueryUtil.getConnectionUrl(props, utility.getConfiguration()); + + try (Connection conn = driver.connect(url, props)) { + createTableAndUpsertData(conn, tableName); + htableThreadPoolHistograms = runQueryAndGetHistograms(conn, tableName); + Assert.assertNull(htableThreadPoolHistograms.get(HISTOGRAM_DISABLED_THREAD_POOL)); + } + } + + @Test + public void testNullHistogramSupplier() throws Exception { + String tableName = NULL_SUPPLIER_THREAD_POOL + "." + generateUniqueName(); + + Map<String, List<HistogramDistribution>> htableThreadPoolHistograms; + String url = QueryUtil.getConnectionUrl(props, utility.getConfiguration()); + + try (Connection conn = driver.connect(url, props)) { + createTableAndUpsertData(conn, tableName); + htableThreadPoolHistograms = runQueryAndGetHistograms(conn, tableName); + Assert.assertNull(htableThreadPoolHistograms.get(NULL_SUPPLIER_THREAD_POOL)); + } + } + + @Test + public void testHistogramsWithoutTags() throws Exception { + String tableName = generateUniqueName(); + Map<String, List<HistogramDistribution>> htableThreadPoolHistograms; + + String url = QueryUtil.getConnectionUrl(props, utility.getConfiguration()); + try (Connection conn = driver.connect(url, props)) { + createTableAndUpsertData(conn, tableName); + htableThreadPoolHistograms = runQueryAndGetHistograms(conn, tableName); + assertHTableThreadPoolUsed(htableThreadPoolHistograms, NO_TAGS_THREAD_POOL); + assertHistogramTags(htableThreadPoolHistograms, new HashMap<>(), NO_TAGS_THREAD_POOL); + } + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java index b5f6d67a80..26c160c16b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java @@ -126,6 +126,19 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver { } } + public void cleanUpCQSICache() throws SQLException { + lockInterruptibly(LockMode.WRITE); + try { + for (ConnectionQueryServices service : connectionQueryServicesMap.values()) { + service.close(); + } + connectionQueryServicesMap.clear(); + } + finally { + unlock(LockMode.WRITE); + } + } + @GuardedBy("closeLock") private void checkClosed() { if (closed) { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index a888a8ac1a..3e9775e216 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -126,6 +126,7 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.RegionMetrics; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.StartMiniClusterOption; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.RegionInfo; @@ -559,8 +560,14 @@ public abstract class BaseTest { utility = new HBaseTestingUtility(conf); try { long startTime = System.currentTimeMillis(); - utility.startMiniCluster(overrideProps.getInt( - QueryServices.TESTS_MINI_CLUSTER_NUM_REGION_SERVERS, NUM_SLAVES_BASE)); + StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder(); + builder.numMasters(overrideProps.getInt(QueryServices.TESTS_MINI_CLUSTER_NUM_MASTERS, + 1)); + int numSlaves = overrideProps.getInt( + QueryServices.TESTS_MINI_CLUSTER_NUM_REGION_SERVERS, NUM_SLAVES_BASE); + builder.numRegionServers(numSlaves); + builder.numDataNodes(numSlaves); + utility.startMiniCluster(builder.build()); long startupTime = System.currentTimeMillis()-startTime; LOGGER.info("HBase minicluster startup complete in {} ms", startupTime); return getLocalClusterUrl(utility);