Copilot commented on code in PR #18047:
URL: https://github.com/apache/pinot/pull/18047#discussion_r3018031418
##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java:
##########
@@ -85,20 +88,75 @@ public ResourceManager(PinotConfiguration config) {
ThreadFactory queryRunnerFactory = new
TracedThreadFactory(QUERY_RUNNER_THREAD_PRIORITY, false,
CommonConstants.ExecutorService.PINOT_QUERY_RUNNER_NAME_FORMAT);
- ExecutorService runnerService =
Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory);
- runnerService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
- runnerService, config, "query runner");
+ _queryRunnerPool = (ThreadPoolExecutor)
Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory);
+ ExecutorService runnerService =
ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
+ _queryRunnerPool, config, "query runner");
_queryRunners = MoreExecutors.listeningDecorator(runnerService);
// pqw -> pinot query workers
ThreadFactory queryWorkersFactory = new
TracedThreadFactory(Thread.NORM_PRIORITY, false,
CommonConstants.ExecutorService.PINOT_QUERY_WORKER_NAME_FORMAT);
- ExecutorService workerService =
Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory);
- workerService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
- workerService, config, "query worker");
+ _queryWorkerPool = (ThreadPoolExecutor)
Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory);
+ ExecutorService workerService =
ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
+ _queryWorkerPool, config, "query worker");
_queryWorkers = MoreExecutors.listeningDecorator(workerService);
}
+ /**
+ * Dynamically resizes the query runner and worker thread pools. Resizing is
performed on the underlying
+ * {@link ThreadPoolExecutor} instances, which is transparent to any
decorator wrappers.
+ *
+ * @param newRunnerThreads desired number of query runner threads (must be
> 0)
+ * @param newWorkerThreads desired number of query worker threads (must be
> 0)
+ */
+ public void resizeThreadPools(int newRunnerThreads, int newWorkerThreads) {
+ if (newRunnerThreads <= 0 || newWorkerThreads <= 0) {
+ LOGGER.warn("Invalid thread pool sizes: runnerThreads={},
workerThreads={}. Sizes must be > 0. Skipping resize.",
+ newRunnerThreads, newWorkerThreads);
+ return;
Review Comment:
`resizeThreadPools()` only adjusts the underlying executors and the
counters, but other scheduler/admission components capture the original thread
counts at construction time and won’t be updated. For example,
`PriorityScheduler` snapshots `resourceManager.getNumQueryRunnerThreads()` into
a final `_numRunners` and sizes `_runningQueriesSemaphore` from it, and
`PolicyBasedResourceManager`/`BinaryWorkloadResourceManager` construct
`ResourceLimitPolicy` from the initial `_numQueryWorkerThreads`. As a result,
changing `pinot.query.scheduler.query_runner_threads`/`query_worker_threads` at
runtime may not actually change scheduling concurrency/limits as intended.
Consider adding a resize hook that updates these dependent policies (or moving
the resize handling to `QueryScheduler` so it can update its semaphore and any
policy objects alongside the pools).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]