hongkunxu commented on code in PR #18047:
URL: https://github.com/apache/pinot/pull/18047#discussion_r3025589772
##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java:
##########
@@ -85,20 +102,94 @@ public ResourceManager(PinotConfiguration config) {
ThreadFactory queryRunnerFactory = new
TracedThreadFactory(QUERY_RUNNER_THREAD_PRIORITY, false,
CommonConstants.ExecutorService.PINOT_QUERY_RUNNER_NAME_FORMAT);
- ExecutorService runnerService =
Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory);
- runnerService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
- runnerService, config, "query runner");
+ _queryRunnerPool = (ThreadPoolExecutor)
Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory);
+ ExecutorService runnerService =
ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
+ _queryRunnerPool, config, "query runner");
_queryRunners = MoreExecutors.listeningDecorator(runnerService);
// pqw -> pinot query workers
ThreadFactory queryWorkersFactory = new
TracedThreadFactory(Thread.NORM_PRIORITY, false,
CommonConstants.ExecutorService.PINOT_QUERY_WORKER_NAME_FORMAT);
- ExecutorService workerService =
Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory);
- workerService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
- workerService, config, "query worker");
+ _queryWorkerPool = (ThreadPoolExecutor)
Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory);
+ ExecutorService workerService =
ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
+ _queryWorkerPool, config, "query worker");
_queryWorkers = MoreExecutors.listeningDecorator(workerService);
}
+ /**
+ * Dynamically resizes the query runner and worker thread pools. Resizing is
performed on the underlying
+ * {@link ThreadPoolExecutor} instances, which is transparent to any
decorator wrappers.
+ *
+ * @param newRunnerThreads desired number of query runner threads (must be
> 0)
+ * @param newWorkerThreads desired number of query worker threads (must be
> 0)
+ */
+ public void resizeThreadPools(int newRunnerThreads, int newWorkerThreads) {
+ if (newRunnerThreads <= 0 || newWorkerThreads <= 0) {
+ LOGGER.warn("Invalid thread pool sizes: runnerThreads={},
workerThreads={}. Sizes must be > 0. Skipping resize.",
+ newRunnerThreads, newWorkerThreads);
+ return;
+ }
+
+ int oldRunnerThreads = _numQueryRunnerThreads;
+ int oldWorkerThreads = _numQueryWorkerThreads;
+
+ if (oldRunnerThreads == newRunnerThreads && oldWorkerThreads ==
newWorkerThreads) {
+ LOGGER.debug("Thread pool sizes unchanged (runner={}, worker={}).
Skipping resize.",
+ newRunnerThreads, newWorkerThreads);
+ return;
+ }
+
+ resizePool(_queryRunnerPool, oldRunnerThreads, newRunnerThreads,
"queryRunner");
+ _numQueryRunnerThreads = newRunnerThreads;
+
+ resizePool(_queryWorkerPool, oldWorkerThreads, newWorkerThreads,
"queryWorker");
+ _numQueryWorkerThreads = newWorkerThreads;
+
+ LOGGER.info("Resized thread pools: runner {} -> {}, worker {} -> {}",
+ oldRunnerThreads, newRunnerThreads, oldWorkerThreads,
newWorkerThreads);
+
+ onThreadPoolsResized(newRunnerThreads, newWorkerThreads);
+ for (ThreadPoolResizeListener listener : _resizeListeners) {
+ listener.onThreadPoolsResized(newRunnerThreads, newWorkerThreads);
+ }
+ }
+
+ /**
+ * Registers a listener to be notified after thread pools are resized.
+ */
+ public void addThreadPoolResizeListener(ThreadPoolResizeListener listener) {
+ _resizeListeners.add(listener);
+ }
+
+ /**
+ * Hook for subclasses to update dependent state (e.g. resource limit
policies) after thread pools are resized.
+ * Called before external listeners are notified.
+ */
+ protected void onThreadPoolsResized(int newRunnerThreads, int
newWorkerThreads) {
+ }
+
+ private static void resizePool(ThreadPoolExecutor pool, int oldSize, int
newSize, String poolName) {
+ if (oldSize == newSize) {
+ return;
+ }
+ // Scale up:
+ // Increase maximumPoolSize first, then corePoolSize.
+ // If corePoolSize is increased first, it may temporarily exceed
maximumPoolSize,
+ // which would cause an IllegalArgumentException.
+ if (newSize > oldSize) {
+ pool.setMaximumPoolSize(newSize);
+ pool.setCorePoolSize(newSize);
+ } else {
+ // Scale down:
+ // Decrease corePoolSize first, then maximumPoolSize.
+ // If maximumPoolSize is decreased first, it may become smaller than
corePoolSize,
+ // which would also cause an IllegalArgumentException.
+ pool.setCorePoolSize(newSize);
+ pool.setMaximumPoolSize(newSize);
+ }
+ LOGGER.info("Resized {} pool: {} -> {}", poolName, oldSize, newSize);
Review Comment:
This is not a frequently updated operation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]