Copilot commented on code in PR #18047:
URL: https://github.com/apache/pinot/pull/18047#discussion_r3018031418


##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java:
##########
@@ -85,20 +88,75 @@ public ResourceManager(PinotConfiguration config) {
     ThreadFactory queryRunnerFactory = new 
TracedThreadFactory(QUERY_RUNNER_THREAD_PRIORITY, false,
         CommonConstants.ExecutorService.PINOT_QUERY_RUNNER_NAME_FORMAT);
 
-    ExecutorService runnerService = 
Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory);
-    runnerService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
-        runnerService, config, "query runner");
+    _queryRunnerPool = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory);
+    ExecutorService runnerService = 
ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
+        _queryRunnerPool, config, "query runner");
     _queryRunners = MoreExecutors.listeningDecorator(runnerService);
 
     // pqw -> pinot query workers
     ThreadFactory queryWorkersFactory = new 
TracedThreadFactory(Thread.NORM_PRIORITY, false,
         CommonConstants.ExecutorService.PINOT_QUERY_WORKER_NAME_FORMAT);
-    ExecutorService workerService = 
Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory);
-    workerService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
-        workerService, config, "query worker");
+    _queryWorkerPool = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory);
+    ExecutorService workerService = 
ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
+        _queryWorkerPool, config, "query worker");
     _queryWorkers = MoreExecutors.listeningDecorator(workerService);
   }
 
+  /**
+   * Dynamically resizes the query runner and worker thread pools. Resizing is 
performed on the underlying
+   * {@link ThreadPoolExecutor} instances, which is transparent to any 
decorator wrappers.
+   *
+   * @param newRunnerThreads desired number of query runner threads (must be 
> 0)
+   * @param newWorkerThreads desired number of query worker threads (must be 
> 0)
+   */
+  public void resizeThreadPools(int newRunnerThreads, int newWorkerThreads) {
+    if (newRunnerThreads <= 0 || newWorkerThreads <= 0) {
+      LOGGER.warn("Invalid thread pool sizes: runnerThreads={}, 
workerThreads={}. Sizes must be > 0. Skipping resize.",
+          newRunnerThreads, newWorkerThreads);
+      return;

Review Comment:
   `resizeThreadPools()` only adjusts the underlying executors and the 
counters, but other scheduler/admission components capture the original thread 
counts at construction time and won’t be updated. For example, 
`PriorityScheduler` snapshots `resourceManager.getNumQueryRunnerThreads()` into 
a final `_numRunners` and sizes `_runningQueriesSemaphore` from it, and 
`PolicyBasedResourceManager`/`BinaryWorkloadResourceManager` construct 
`ResourceLimitPolicy` from the initial `_numQueryWorkerThreads`. As a result, 
changing `pinot.query.scheduler.query_runner_threads`/`query_worker_threads` at 
runtime may not actually change scheduling concurrency/limits as intended. 
Consider adding a resize hook that updates these dependent policies (or moving 
the resize handling to `QueryScheduler` so it can update its semaphore and any 
policy objects alongside the pools).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to