hongkunxu commented on code in PR #18047:
URL: https://github.com/apache/pinot/pull/18047#discussion_r3025589772


##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java:
##########
@@ -85,20 +102,94 @@ public ResourceManager(PinotConfiguration config) {
     ThreadFactory queryRunnerFactory = new 
TracedThreadFactory(QUERY_RUNNER_THREAD_PRIORITY, false,
         CommonConstants.ExecutorService.PINOT_QUERY_RUNNER_NAME_FORMAT);
 
-    ExecutorService runnerService = 
Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory);
-    runnerService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
-        runnerService, config, "query runner");
+    _queryRunnerPool = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory);
+    ExecutorService runnerService = 
ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
+        _queryRunnerPool, config, "query runner");
     _queryRunners = MoreExecutors.listeningDecorator(runnerService);
 
     // pqw -> pinot query workers
     ThreadFactory queryWorkersFactory = new 
TracedThreadFactory(Thread.NORM_PRIORITY, false,
         CommonConstants.ExecutorService.PINOT_QUERY_WORKER_NAME_FORMAT);
-    ExecutorService workerService = 
Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory);
-    workerService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
-        workerService, config, "query worker");
+    _queryWorkerPool = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory);
+    ExecutorService workerService = 
ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
+        _queryWorkerPool, config, "query worker");
     _queryWorkers = MoreExecutors.listeningDecorator(workerService);
   }
 
+  /**
+   * Dynamically resizes the query runner and worker thread pools. Resizing is 
performed on the underlying
+   * {@link ThreadPoolExecutor} instances, which is transparent to any 
decorator wrappers.
+   *
+   * @param newRunnerThreads desired number of query runner threads (must be 
> 0)
+   * @param newWorkerThreads desired number of query worker threads (must be 
> 0)
+   */
+  public void resizeThreadPools(int newRunnerThreads, int newWorkerThreads) {
+    if (newRunnerThreads <= 0 || newWorkerThreads <= 0) {
+      LOGGER.warn("Invalid thread pool sizes: runnerThreads={}, 
workerThreads={}. Sizes must be > 0. Skipping resize.",
+          newRunnerThreads, newWorkerThreads);
+      return;
+    }
+
+    int oldRunnerThreads = _numQueryRunnerThreads;
+    int oldWorkerThreads = _numQueryWorkerThreads;
+
+    if (oldRunnerThreads == newRunnerThreads && oldWorkerThreads == 
newWorkerThreads) {
+      LOGGER.debug("Thread pool sizes unchanged (runner={}, worker={}). 
Skipping resize.",
+          newRunnerThreads, newWorkerThreads);
+      return;
+    }
+
+    resizePool(_queryRunnerPool, oldRunnerThreads, newRunnerThreads, 
"queryRunner");
+    _numQueryRunnerThreads = newRunnerThreads;
+
+    resizePool(_queryWorkerPool, oldWorkerThreads, newWorkerThreads, 
"queryWorker");
+    _numQueryWorkerThreads = newWorkerThreads;
+
+    LOGGER.info("Resized thread pools: runner {} -> {}, worker {} -> {}",
+        oldRunnerThreads, newRunnerThreads, oldWorkerThreads, 
newWorkerThreads);
+
+    onThreadPoolsResized(newRunnerThreads, newWorkerThreads);
+    for (ThreadPoolResizeListener listener : _resizeListeners) {
+      listener.onThreadPoolsResized(newRunnerThreads, newWorkerThreads);
+    }
+  }
+
+  /**
+   * Registers a listener to be notified after thread pools are resized.
+   */
+  public void addThreadPoolResizeListener(ThreadPoolResizeListener listener) {
+    _resizeListeners.add(listener);
+  }
+
+  /**
+   * Hook for subclasses to update dependent state (e.g. resource limit 
policies) after thread pools are resized.
+   * Called before external listeners are notified.
+   */
+  protected void onThreadPoolsResized(int newRunnerThreads, int 
newWorkerThreads) {
+  }
+
+  private static void resizePool(ThreadPoolExecutor pool, int oldSize, int 
newSize, String poolName) {
+    if (oldSize == newSize) {
+      return;
+    }
+    // Scale up:
+    // Increase maximumPoolSize first, then corePoolSize.
+    // If corePoolSize is increased first, it may temporarily exceed 
maximumPoolSize,
+    // which would cause an IllegalArgumentException.
+    if (newSize > oldSize) {
+      pool.setMaximumPoolSize(newSize);
+      pool.setCorePoolSize(newSize);
+    } else {
+      // Scale down:
+      // Decrease corePoolSize first, then maximumPoolSize.
+      // If maximumPoolSize is decreased first, it may become smaller than 
corePoolSize,
+      // which would also cause an IllegalArgumentException.
+      pool.setCorePoolSize(newSize);
+      pool.setMaximumPoolSize(newSize);
+    }
+    LOGGER.info("Resized {} pool: {} -> {}", poolName, oldSize, newSize);

Review Comment:
   This is not a frequently updated operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to