This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e10a36af61 [enhance] Support dynamically modify server thread pool 
(#18047)
7e10a36af61 is described below

commit 7e10a36af617134827631980e3701ffa18a39b14
Author: Hongkun Xu <[email protected]>
AuthorDate: Sun Apr 12 16:13:35 2026 +0800

    [enhance] Support dynamically modify server thread pool (#18047)
    
    * Support dynamiclly modify server thread pool
    
    Signed-off-by: Hongkun Xu <[email protected]>
    
    * add relevant class modify
    
    Signed-off-by: Hongkun Xu <[email protected]>
    
    * add synchonize for resizePool method
    
    Signed-off-by: Hongkun Xu <[email protected]>
    
    * support roback to default query thread pool config
    
    Signed-off-by: Hongkun Xu <[email protected]>
    
    * add patch for lucene thread pool
    
    Signed-off-by: Hongkun Xu <[email protected]>
    
    ---------
    
    Signed-off-by: Hongkun Xu <[email protected]>
---
 .../core/query/scheduler/PriorityScheduler.java    |  32 +++-
 .../pinot/core/query/scheduler/QueryScheduler.java |   4 +
 ...erySchedulerThreadPoolConfigChangeListener.java |  96 ++++++++++
 .../resources/BinaryWorkloadResourceManager.java   |   8 +-
 .../resources/PolicyBasedResourceManager.java      |   7 +-
 .../query/scheduler/resources/ResourceManager.java | 107 ++++++++++-
 .../query/scheduler/PrioritySchedulerTest.java     |  29 ++-
 ...chedulerThreadPoolConfigChangeListenerTest.java | 213 +++++++++++++++++++++
 .../resources/PolicyBasedResourceManagerTest.java  |  76 ++++++++
 .../scheduler/resources/ResourceManagerTest.java   | 107 ++++++++++-
 .../RealtimeLuceneTextIndexSearcherPool.java       |  47 ++++-
 .../RealtimeLuceneTextIndexSearcherPoolTest.java   |  78 ++++++++
 .../server/starter/helix/BaseServerStarter.java    |   9 +
 13 files changed, 792 insertions(+), 21 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
index 7a7a5d7b7c2..15dbd600fde 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
@@ -42,11 +42,31 @@ import org.slf4j.LoggerFactory;
 public abstract class PriorityScheduler extends QueryScheduler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PriorityScheduler.class);
 
+  /**
+   * A {@link Semaphore} subclass that supports adjusting the number of 
permits at runtime.
+   * This is needed because {@link Semaphore#reducePermits(int)} is protected.
+   */
+  @VisibleForTesting
+  static class ResizableSemaphore extends Semaphore {
+    ResizableSemaphore(int permits) {
+      super(permits);
+    }
+
+    void resize(int oldPermits, int newPermits) {
+      int delta = newPermits - oldPermits;
+      if (delta > 0) {
+        release(delta);
+      } else if (delta < 0) {
+        reducePermits(-delta);
+      }
+    }
+  }
+
   protected final SchedulerPriorityQueue _queryQueue;
 
   @VisibleForTesting
-  protected final Semaphore _runningQueriesSemaphore;
-  private final int _numRunners;
+  protected final ResizableSemaphore _runningQueriesSemaphore;
+  private volatile int _numRunners;
   @VisibleForTesting
   Thread _scheduler;
 
@@ -56,7 +76,13 @@ public abstract class PriorityScheduler extends 
QueryScheduler {
     super(config, instanceId, queryExecutor, threadAccountant, 
latestQueryTime, resourceManager);
     _queryQueue = queue;
     _numRunners = resourceManager.getNumQueryRunnerThreads();
-    _runningQueriesSemaphore = new Semaphore(_numRunners);
+    _runningQueriesSemaphore = new ResizableSemaphore(_numRunners);
+    resourceManager.addThreadPoolResizeListener((newRunnerThreads, 
newWorkerThreads) -> {
+      int oldRunners = _numRunners;
+      _runningQueriesSemaphore.resize(oldRunners, newRunnerThreads);
+      _numRunners = newRunnerThreads;
+      LOGGER.info("Resized running queries semaphore: {} -> {}", oldRunners, 
newRunnerThreads);
+    });
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 9d488f7ee0f..3f7e891c61a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -80,6 +80,10 @@ public abstract class QueryScheduler {
     _resourceManager = resourceManager;
   }
 
+  public ResourceManager getResourceManager() {
+    return _resourceManager;
+  }
+
   /**
    * Submit a query for execution. The query will be scheduled for execution 
as per the scheduling algorithm
    * @param queryRequest query to schedule for execution
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerThreadPoolConfigChangeListener.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerThreadPoolConfigChangeListener.java
new file mode 100644
index 00000000000..9c2efd9ad6b
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerThreadPoolConfigChangeListener.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Listens for cluster config changes to dynamically resize the query 
scheduler thread pools
+ * ({@code query_runner_threads} and {@code query_worker_threads}).
+ */
+public class QuerySchedulerThreadPoolConfigChangeListener implements 
PinotClusterConfigChangeListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(QuerySchedulerThreadPoolConfigChangeListener.class);
+
+  static final String QUERY_RUNNER_THREADS_KEY =
+      CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + 
ResourceManager.QUERY_RUNNER_CONFIG_KEY;
+  static final String QUERY_WORKER_THREADS_KEY =
+      CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." + 
ResourceManager.QUERY_WORKER_CONFIG_KEY;
+
+  private final ResourceManager _resourceManager;
+
+  public QuerySchedulerThreadPoolConfigChangeListener(ResourceManager 
resourceManager) {
+    _resourceManager = resourceManager;
+  }
+
+  @Override
+  public void onChange(Set<String> changedConfigs, Map<String, String> 
clusterConfigs) {
+    if (!changedConfigs.contains(QUERY_RUNNER_THREADS_KEY)
+        && !changedConfigs.contains(QUERY_WORKER_THREADS_KEY)) {
+      return;
+    }
+
+    int newRunnerThreads = _resourceManager.getNumQueryRunnerThreads();
+    int newWorkerThreads = _resourceManager.getNumQueryWorkerThreads();
+
+    if (changedConfigs.contains(QUERY_RUNNER_THREADS_KEY)) {
+      if (clusterConfigs.containsKey(QUERY_RUNNER_THREADS_KEY)) {
+        try {
+          newRunnerThreads = 
Integer.parseInt(clusterConfigs.get(QUERY_RUNNER_THREADS_KEY));
+        } catch (NumberFormatException e) {
+          LOGGER.error("Invalid value for {}: '{}'. Keeping current value: {}",
+              QUERY_RUNNER_THREADS_KEY, 
clusterConfigs.get(QUERY_RUNNER_THREADS_KEY),
+              _resourceManager.getNumQueryRunnerThreads(), e);
+          return;
+        }
+      } else {
+        newRunnerThreads = ResourceManager.DEFAULT_QUERY_RUNNER_THREADS;
+        LOGGER.info("Key '{}' was removed from cluster config. Reverting to 
system default: {}",
+            QUERY_RUNNER_THREADS_KEY, newRunnerThreads);
+      }
+    }
+
+    if (changedConfigs.contains(QUERY_WORKER_THREADS_KEY)) {
+      if (clusterConfigs.containsKey(QUERY_WORKER_THREADS_KEY)) {
+        try {
+          newWorkerThreads = 
Integer.parseInt(clusterConfigs.get(QUERY_WORKER_THREADS_KEY));
+        } catch (NumberFormatException e) {
+          LOGGER.error("Invalid value for {}: '{}'. Keeping current value: {}",
+              QUERY_WORKER_THREADS_KEY, 
clusterConfigs.get(QUERY_WORKER_THREADS_KEY),
+              _resourceManager.getNumQueryWorkerThreads(), e);
+          return;
+        }
+      } else {
+        newWorkerThreads = ResourceManager.DEFAULT_QUERY_WORKER_THREADS;
+        LOGGER.info("Key '{}' was removed from cluster config. Reverting to 
system default: {}",
+            QUERY_WORKER_THREADS_KEY, newWorkerThreads);
+      }
+    }
+
+    LOGGER.info("Cluster config change detected. Resizing query scheduler 
thread pools: runner={}, worker={}",
+        newRunnerThreads, newWorkerThreads);
+    _resourceManager.resizeThreadPools(newRunnerThreads, newWorkerThreads);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
index fb7ffd93e3f..dcc25ac4e06 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
@@ -32,13 +32,19 @@ import org.slf4j.LoggerFactory;
  */
 public class BinaryWorkloadResourceManager extends ResourceManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BinaryWorkloadResourceManager.class);
-  private final ResourceLimitPolicy _secondaryWorkloadPolicy;
+  // Volatile to ensure visibility across query threads when the policy is 
replaced by onThreadPoolsResized()
+  private volatile ResourceLimitPolicy _secondaryWorkloadPolicy;
 
   public BinaryWorkloadResourceManager(PinotConfiguration config) {
     super(config);
     _secondaryWorkloadPolicy = new ResourceLimitPolicy(config, 
_numQueryWorkerThreads);
   }
 
+  @Override
+  protected void onThreadPoolsResized(int newRunnerThreads, int 
newWorkerThreads) {
+    _secondaryWorkloadPolicy = new ResourceLimitPolicy(_config, 
newWorkerThreads);
+  }
+
   /**
    * Returns an executor service that query executor can use like a dedicated
    * service for submitting jobs for parallel execution.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManager.java
index dd2c942c565..8e32a3f2c77 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManager.java
@@ -32,13 +32,18 @@ import org.slf4j.LoggerFactory;
 public class PolicyBasedResourceManager extends ResourceManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PolicyBasedResourceManager.class);
 
-  private final ResourceLimitPolicy _resourcePolicy;
+  private volatile ResourceLimitPolicy _resourcePolicy;
 
   public PolicyBasedResourceManager(PinotConfiguration config) {
     super(config);
     _resourcePolicy = new ResourceLimitPolicy(config, _numQueryWorkerThreads);
   }
 
+  @Override
+  protected void onThreadPoolsResized(int newRunnerThreads, int 
newWorkerThreads) {
+    _resourcePolicy = new ResourceLimitPolicy(_config, newWorkerThreads);
+  }
+
   /**
    * Returns an executor service that query executor can use like a dedicated
    * service for submitting jobs for parallel execution.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
index 4f156b52feb..cc3813ec5aa 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
@@ -21,9 +21,12 @@ package org.apache.pinot.core.query.scheduler.resources;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.pinot.core.executor.ThrottleOnCriticalHeapUsageExecutor;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
@@ -59,6 +62,14 @@ public abstract class ResourceManager {
     DEFAULT_QUERY_WORKER_THREADS = 2 * numCores;
   }
 
+  /**
+   * Listener notified after thread pools have been resized.
+   */
+  @FunctionalInterface
+  public interface ThreadPoolResizeListener {
+    void onThreadPoolsResized(int newRunnerThreads, int newWorkerThreads);
+  }
+
   // set the main query runner priority higher than NORM but lower than MAX
   // because if a query is complete we want to deserialize and return response 
as soon
   // as possible
@@ -67,15 +78,21 @@ public abstract class ResourceManager {
   // including planning, distributing operators across threads, waiting and
   // reducing the results from the parallel set of operators (CombineOperator)
   //
+  protected final PinotConfiguration _config;
   protected final ListeningExecutorService _queryRunners;
   protected final ListeningExecutorService _queryWorkers;
-  protected final int _numQueryRunnerThreads;
-  protected final int _numQueryWorkerThreads;
+  protected final ThreadPoolExecutor _queryRunnerPool;
+  protected final ThreadPoolExecutor _queryWorkerPool;
+  protected volatile int _numQueryRunnerThreads;
+  protected volatile int _numQueryWorkerThreads;
+
+  private final List<ThreadPoolResizeListener> _resizeListeners = new 
CopyOnWriteArrayList<>();
 
   /**
    * @param config configuration for initializing resource manager
    */
   public ResourceManager(PinotConfiguration config) {
+    _config = config;
     _numQueryRunnerThreads = config.getProperty(QUERY_RUNNER_CONFIG_KEY, 
DEFAULT_QUERY_RUNNER_THREADS);
     _numQueryWorkerThreads = config.getProperty(QUERY_WORKER_CONFIG_KEY, 
DEFAULT_QUERY_WORKER_THREADS);
 
@@ -85,20 +102,94 @@ public abstract class ResourceManager {
     ThreadFactory queryRunnerFactory = new 
TracedThreadFactory(QUERY_RUNNER_THREAD_PRIORITY, false,
         CommonConstants.ExecutorService.PINOT_QUERY_RUNNER_NAME_FORMAT);
 
-    ExecutorService runnerService = 
Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory);
-    runnerService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
-        runnerService, config, "query runner");
+    _queryRunnerPool = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory);
+    ExecutorService runnerService = 
ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
+        _queryRunnerPool, config, "query runner");
     _queryRunners = MoreExecutors.listeningDecorator(runnerService);
 
     // pqw -> pinot query workers
     ThreadFactory queryWorkersFactory = new 
TracedThreadFactory(Thread.NORM_PRIORITY, false,
         CommonConstants.ExecutorService.PINOT_QUERY_WORKER_NAME_FORMAT);
-    ExecutorService workerService = 
Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory);
-    workerService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
-        workerService, config, "query worker");
+    _queryWorkerPool = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory);
+    ExecutorService workerService = 
ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
+        _queryWorkerPool, config, "query worker");
     _queryWorkers = MoreExecutors.listeningDecorator(workerService);
   }
 
+  /**
+   * Dynamically resizes the query runner and worker thread pools. Resizing is 
performed on the underlying
+   * {@link ThreadPoolExecutor} instances, which is transparent to any 
decorator wrappers.
+   *
+   * @param newRunnerThreads desired number of query runner threads (must be 
&gt; 0)
+   * @param newWorkerThreads desired number of query worker threads (must be 
&gt; 0)
+   */
+  public synchronized void resizeThreadPools(int newRunnerThreads, int 
newWorkerThreads) {
+    if (newRunnerThreads <= 0 || newWorkerThreads <= 0) {
+      LOGGER.warn("Invalid thread pool sizes: runnerThreads={}, 
workerThreads={}. Sizes must be > 0. Skipping resize.",
+          newRunnerThreads, newWorkerThreads);
+      return;
+    }
+
+    int oldRunnerThreads = _numQueryRunnerThreads;
+    int oldWorkerThreads = _numQueryWorkerThreads;
+
+    if (oldRunnerThreads == newRunnerThreads && oldWorkerThreads == 
newWorkerThreads) {
+      LOGGER.debug("Thread pool sizes unchanged (runner={}, worker={}). 
Skipping resize.",
+          newRunnerThreads, newWorkerThreads);
+      return;
+    }
+
+    resizePool(_queryRunnerPool, oldRunnerThreads, newRunnerThreads, 
"queryRunner");
+    _numQueryRunnerThreads = newRunnerThreads;
+
+    resizePool(_queryWorkerPool, oldWorkerThreads, newWorkerThreads, 
"queryWorker");
+    _numQueryWorkerThreads = newWorkerThreads;
+
+    LOGGER.info("Resized thread pools: runner {} -> {}, worker {} -> {}",
+        oldRunnerThreads, newRunnerThreads, oldWorkerThreads, 
newWorkerThreads);
+
+    onThreadPoolsResized(newRunnerThreads, newWorkerThreads);
+    for (ThreadPoolResizeListener listener : _resizeListeners) {
+      listener.onThreadPoolsResized(newRunnerThreads, newWorkerThreads);
+    }
+  }
+
+  /**
+   * Registers a listener to be notified after thread pools are resized.
+   */
+  public void addThreadPoolResizeListener(ThreadPoolResizeListener listener) {
+    _resizeListeners.add(listener);
+  }
+
+  /**
+   * Hook for subclasses to update dependent state (e.g. resource limit 
policies) after thread pools are resized.
+   * Called before external listeners are notified.
+   */
+  protected void onThreadPoolsResized(int newRunnerThreads, int 
newWorkerThreads) {
+  }
+
+  private synchronized void resizePool(ThreadPoolExecutor pool, int oldSize, 
int newSize, String poolName) {
+    if (oldSize == newSize) {
+      return;
+    }
+    // Scale up:
+    // Increase maximumPoolSize first, then corePoolSize.
+    // If corePoolSize is increased first, it may temporarily exceed 
maximumPoolSize,
+    // which would cause an IllegalArgumentException.
+    if (newSize > oldSize) {
+      pool.setMaximumPoolSize(newSize);
+      pool.setCorePoolSize(newSize);
+    } else {
+      // Scale down:
+      // Decrease corePoolSize first, then maximumPoolSize.
+      // If maximumPoolSize is decreased first, it may become smaller than 
corePoolSize,
+      // which would also cause an IllegalArgumentException.
+      pool.setCorePoolSize(newSize);
+      pool.setMaximumPoolSize(newSize);
+    }
+    LOGGER.info("Resized {} pool: {} -> {}", poolName, oldSize, newSize);
+  }
+
   public void stop() {
     _queryWorkers.shutdownNow();
     _queryRunners.shutdownNow();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
index fa7a2a0584d..141fcf89863 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
@@ -235,6 +235,32 @@ public class PrioritySchedulerTest {
     scheduler.stop();
   }
 
+  @Test
+  public void testResizeUpdatesSemaphoreIncrease() {
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(ResourceManager.QUERY_RUNNER_CONFIG_KEY, 4);
+    properties.put(ResourceManager.QUERY_WORKER_CONFIG_KEY, 8);
+    TestPriorityScheduler scheduler = TestPriorityScheduler.create(new 
PinotConfiguration(properties));
+    assertEquals(scheduler.getRunningQueriesSemaphore().availablePermits(), 4);
+
+    scheduler.getResourceManager().resizeThreadPools(8, 16);
+    assertEquals(scheduler.getRunningQueriesSemaphore().availablePermits(), 8);
+    scheduler.stop();
+  }
+
+  @Test
+  public void testResizeUpdatesSemaphoreDecrease() {
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(ResourceManager.QUERY_RUNNER_CONFIG_KEY, 8);
+    properties.put(ResourceManager.QUERY_WORKER_CONFIG_KEY, 16);
+    TestPriorityScheduler scheduler = TestPriorityScheduler.create(new 
PinotConfiguration(properties));
+    assertEquals(scheduler.getRunningQueriesSemaphore().availablePermits(), 8);
+
+    scheduler.getResourceManager().resizeThreadPools(4, 8);
+    assertEquals(scheduler.getRunningQueriesSemaphore().availablePermits(), 4);
+    scheduler.stop();
+  }
+
   static class TestPriorityScheduler extends PriorityScheduler {
     static TestSchedulerGroupFactory _groupFactory;
     static LongAccumulator _latestQueryTime;
@@ -261,7 +287,8 @@ public class PrioritySchedulerTest {
       return create(new PinotConfiguration());
     }
 
-    ResourceManager getResourceManager() {
+    @Override
+    public ResourceManager getResourceManager() {
       return _resourceManager;
     }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerThreadPoolConfigChangeListenerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerThreadPoolConfigChangeListenerTest.java
new file mode 100644
index 00000000000..5eb4fa78fe6
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerThreadPoolConfigChangeListenerTest.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class QuerySchedulerThreadPoolConfigChangeListenerTest {
+
+  @Test
+  public void testOnChangeWithRelevantKeys() {
+    ResourceManager resourceManager = mock(ResourceManager.class);
+    when(resourceManager.getNumQueryRunnerThreads()).thenReturn(4);
+    when(resourceManager.getNumQueryWorkerThreads()).thenReturn(8);
+
+    QuerySchedulerThreadPoolConfigChangeListener listener =
+        new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+    Map<String, String> clusterConfigs = new HashMap<>();
+    
clusterConfigs.put(QuerySchedulerThreadPoolConfigChangeListener.QUERY_RUNNER_THREADS_KEY,
 "10");
+    
clusterConfigs.put(QuerySchedulerThreadPoolConfigChangeListener.QUERY_WORKER_THREADS_KEY,
 "20");
+
+    Set<String> changedConfigs = Set.of(
+        QuerySchedulerThreadPoolConfigChangeListener.QUERY_RUNNER_THREADS_KEY,
+        QuerySchedulerThreadPoolConfigChangeListener.QUERY_WORKER_THREADS_KEY);
+
+    listener.onChange(changedConfigs, clusterConfigs);
+
+    verify(resourceManager).resizeThreadPools(10, 20);
+  }
+
+  @Test
+  public void testOnChangeRunnerOnly() {
+    ResourceManager resourceManager = mock(ResourceManager.class);
+    when(resourceManager.getNumQueryRunnerThreads()).thenReturn(4);
+    when(resourceManager.getNumQueryWorkerThreads()).thenReturn(8);
+
+    QuerySchedulerThreadPoolConfigChangeListener listener =
+        new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+    Map<String, String> clusterConfigs = new HashMap<>();
+    
clusterConfigs.put(QuerySchedulerThreadPoolConfigChangeListener.QUERY_RUNNER_THREADS_KEY,
 "10");
+
+    Set<String> changedConfigs = Set.of(
+        QuerySchedulerThreadPoolConfigChangeListener.QUERY_RUNNER_THREADS_KEY);
+
+    listener.onChange(changedConfigs, clusterConfigs);
+
+    verify(resourceManager).resizeThreadPools(10, 8);
+  }
+
+  @Test
+  public void testOnChangeWorkerOnly() {
+    ResourceManager resourceManager = mock(ResourceManager.class);
+    when(resourceManager.getNumQueryRunnerThreads()).thenReturn(4);
+    when(resourceManager.getNumQueryWorkerThreads()).thenReturn(8);
+
+    QuerySchedulerThreadPoolConfigChangeListener listener =
+        new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+    Map<String, String> clusterConfigs = new HashMap<>();
+    
clusterConfigs.put(QuerySchedulerThreadPoolConfigChangeListener.QUERY_WORKER_THREADS_KEY,
 "20");
+
+    Set<String> changedConfigs = Set.of(
+        QuerySchedulerThreadPoolConfigChangeListener.QUERY_WORKER_THREADS_KEY);
+
+    listener.onChange(changedConfigs, clusterConfigs);
+
+    verify(resourceManager).resizeThreadPools(4, 20);
+  }
+
+  @Test
+  public void testOnChangeUnrelatedKeys() {
+    ResourceManager resourceManager = mock(ResourceManager.class);
+
+    QuerySchedulerThreadPoolConfigChangeListener listener =
+        new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+    Map<String, String> clusterConfigs = new HashMap<>();
+    clusterConfigs.put("some.other.config", "value");
+
+    Set<String> changedConfigs = Set.of("some.other.config");
+
+    listener.onChange(changedConfigs, clusterConfigs);
+
+    verify(resourceManager, never()).resizeThreadPools(anyInt(), anyInt());
+  }
+
+  @Test
+  public void testOnChangeInvalidRunnerValue() {
+    ResourceManager resourceManager = mock(ResourceManager.class);
+    when(resourceManager.getNumQueryRunnerThreads()).thenReturn(4);
+    when(resourceManager.getNumQueryWorkerThreads()).thenReturn(8);
+
+    QuerySchedulerThreadPoolConfigChangeListener listener =
+        new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+    Map<String, String> clusterConfigs = new HashMap<>();
+    
clusterConfigs.put(QuerySchedulerThreadPoolConfigChangeListener.QUERY_RUNNER_THREADS_KEY,
 "not_a_number");
+
+    Set<String> changedConfigs = Set.of(
+        QuerySchedulerThreadPoolConfigChangeListener.QUERY_RUNNER_THREADS_KEY);
+
+    listener.onChange(changedConfigs, clusterConfigs);
+
+    verify(resourceManager, never()).resizeThreadPools(anyInt(), anyInt());
+  }
+
+  @Test
+  public void testOnChangeInvalidWorkerValue() {
+    ResourceManager resourceManager = mock(ResourceManager.class);
+    when(resourceManager.getNumQueryRunnerThreads()).thenReturn(4);
+    when(resourceManager.getNumQueryWorkerThreads()).thenReturn(8);
+
+    QuerySchedulerThreadPoolConfigChangeListener listener =
+        new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+    Map<String, String> clusterConfigs = new HashMap<>();
+    
clusterConfigs.put(QuerySchedulerThreadPoolConfigChangeListener.QUERY_WORKER_THREADS_KEY,
 "invalid");
+
+    Set<String> changedConfigs = Set.of(
+        QuerySchedulerThreadPoolConfigChangeListener.QUERY_WORKER_THREADS_KEY);
+
+    listener.onChange(changedConfigs, clusterConfigs);
+
+    verify(resourceManager, never()).resizeThreadPools(anyInt(), anyInt());
+  }
+
+  @Test
+  public void testOnChangeDeletedRunnerKeyRevertsToDefault() {
+    ResourceManager resourceManager = mock(ResourceManager.class);
+    when(resourceManager.getNumQueryRunnerThreads()).thenReturn(32);
+    when(resourceManager.getNumQueryWorkerThreads()).thenReturn(8);
+
+    QuerySchedulerThreadPoolConfigChangeListener listener =
+        new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+    Map<String, String> clusterConfigs = new HashMap<>();
+
+    Set<String> changedConfigs = Set.of(
+        QuerySchedulerThreadPoolConfigChangeListener.QUERY_RUNNER_THREADS_KEY);
+
+    listener.onChange(changedConfigs, clusterConfigs);
+
+    
verify(resourceManager).resizeThreadPools(ResourceManager.DEFAULT_QUERY_RUNNER_THREADS,
 8);
+  }
+
+  @Test
+  public void testOnChangeDeletedWorkerKeyRevertsToDefault() {
+    ResourceManager resourceManager = mock(ResourceManager.class);
+    when(resourceManager.getNumQueryRunnerThreads()).thenReturn(4);
+    when(resourceManager.getNumQueryWorkerThreads()).thenReturn(64);
+
+    QuerySchedulerThreadPoolConfigChangeListener listener =
+        new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+    Map<String, String> clusterConfigs = new HashMap<>();
+
+    Set<String> changedConfigs = Set.of(
+        QuerySchedulerThreadPoolConfigChangeListener.QUERY_WORKER_THREADS_KEY);
+
+    listener.onChange(changedConfigs, clusterConfigs);
+
+    verify(resourceManager).resizeThreadPools(4, 
ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
+  }
+
+  @Test
+  public void testOnChangeBothKeysDeletedRevertToDefaults() {
+    ResourceManager resourceManager = mock(ResourceManager.class);
+    when(resourceManager.getNumQueryRunnerThreads()).thenReturn(32);
+    when(resourceManager.getNumQueryWorkerThreads()).thenReturn(64);
+
+    QuerySchedulerThreadPoolConfigChangeListener listener =
+        new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+    Map<String, String> clusterConfigs = new HashMap<>();
+
+    Set<String> changedConfigs = Set.of(
+        QuerySchedulerThreadPoolConfigChangeListener.QUERY_RUNNER_THREADS_KEY,
+        QuerySchedulerThreadPoolConfigChangeListener.QUERY_WORKER_THREADS_KEY);
+
+    listener.onChange(changedConfigs, clusterConfigs);
+
+    verify(resourceManager).resizeThreadPools(
+        ResourceManager.DEFAULT_QUERY_RUNNER_THREADS, 
ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManagerTest.java
new file mode 100644
index 00000000000..a8d01ea344b
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManagerTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler.resources;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+
+
+public class PolicyBasedResourceManagerTest {
+  private PolicyBasedResourceManager _resourceManager;
+
+  @AfterMethod
+  public void tearDown() {
+    if (_resourceManager != null) {
+      _resourceManager.stop();
+      _resourceManager = null;
+    }
+  }
+
+  @Test
+  public void testResizeUpdatesResourceLimitPolicy() {
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(ResourceManager.QUERY_RUNNER_CONFIG_KEY, 4);
+    properties.put(ResourceManager.QUERY_WORKER_CONFIG_KEY, 20);
+    properties.put(ResourceLimitPolicy.TABLE_THREADS_HARD_LIMIT, 50);
+    properties.put(ResourceLimitPolicy.TABLE_THREADS_SOFT_LIMIT, 30);
+    _resourceManager = new PolicyBasedResourceManager(new 
PinotConfiguration(properties));
+
+    int originalHardLimit = _resourceManager.getTableThreadsHardLimit();
+    int originalSoftLimit = _resourceManager.getTableThreadsSoftLimit();
+
+    _resourceManager.resizeThreadPools(8, 40);
+
+    assertNotEquals(_resourceManager.getTableThreadsHardLimit(), 
originalHardLimit);
+    assertNotEquals(_resourceManager.getTableThreadsSoftLimit(), 
originalSoftLimit);
+    assertEquals(_resourceManager.getNumQueryWorkerThreads(), 40);
+  }
+
+  @Test
+  public void testResizeDoesNotUpdatePolicyOnNoChange() {
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(ResourceManager.QUERY_RUNNER_CONFIG_KEY, 4);
+    properties.put(ResourceManager.QUERY_WORKER_CONFIG_KEY, 20);
+    _resourceManager = new PolicyBasedResourceManager(new 
PinotConfiguration(properties));
+
+    int originalHardLimit = _resourceManager.getTableThreadsHardLimit();
+    int originalSoftLimit = _resourceManager.getTableThreadsSoftLimit();
+
+    _resourceManager.resizeThreadPools(4, 20);
+
+    assertEquals(_resourceManager.getTableThreadsHardLimit(), 
originalHardLimit);
+    assertEquals(_resourceManager.getTableThreadsSoftLimit(), 
originalSoftLimit);
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/ResourceManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/ResourceManagerTest.java
index 0b8feda8ba6..d0b34b0f849 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/ResourceManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/ResourceManagerTest.java
@@ -20,29 +20,130 @@ package org.apache.pinot.core.query.scheduler.resources;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
 
 public class ResourceManagerTest {
+  private ResourceManager _resourceManager;
+
+  @AfterMethod
+  public void tearDown() {
+    if (_resourceManager != null) {
+      _resourceManager.stop();
+      _resourceManager = null;
+    }
+  }
 
   @Test
   public void testCanSchedule() {
-    ResourceManager rm = getResourceManager(2, 5, 1, 3);
+    _resourceManager = getResourceManager(2, 5, 1, 3);
 
     SchedulerGroupAccountant accountant = mock(SchedulerGroupAccountant.class);
     when(accountant.totalReservedThreads()).thenReturn(3);
-    assertFalse(rm.canSchedule(accountant));
+    assertFalse(_resourceManager.canSchedule(accountant));
 
     when(accountant.totalReservedThreads()).thenReturn(2);
-    assertTrue(rm.canSchedule(accountant));
+    assertTrue(_resourceManager.canSchedule(accountant));
+  }
+
+  @Test
+  public void testResizeThreadPoolsIncrease() {
+    _resourceManager = getResourceManager(2, 4, 1, 3);
+    assertEquals(_resourceManager.getNumQueryRunnerThreads(), 2);
+    assertEquals(_resourceManager.getNumQueryWorkerThreads(), 4);
+
+    _resourceManager.resizeThreadPools(4, 8);
+    assertEquals(_resourceManager.getNumQueryRunnerThreads(), 4);
+    assertEquals(_resourceManager.getNumQueryWorkerThreads(), 8);
+    assertEquals(_resourceManager._queryRunnerPool.getCorePoolSize(), 4);
+    assertEquals(_resourceManager._queryRunnerPool.getMaximumPoolSize(), 4);
+    assertEquals(_resourceManager._queryWorkerPool.getCorePoolSize(), 8);
+    assertEquals(_resourceManager._queryWorkerPool.getMaximumPoolSize(), 8);
+  }
+
+  @Test
+  public void testResizeThreadPoolsDecrease() {
+    _resourceManager = getResourceManager(8, 16, 1, 3);
+    assertEquals(_resourceManager.getNumQueryRunnerThreads(), 8);
+    assertEquals(_resourceManager.getNumQueryWorkerThreads(), 16);
+
+    _resourceManager.resizeThreadPools(4, 8);
+    assertEquals(_resourceManager.getNumQueryRunnerThreads(), 4);
+    assertEquals(_resourceManager.getNumQueryWorkerThreads(), 8);
+    assertEquals(_resourceManager._queryRunnerPool.getCorePoolSize(), 4);
+    assertEquals(_resourceManager._queryRunnerPool.getMaximumPoolSize(), 4);
+    assertEquals(_resourceManager._queryWorkerPool.getCorePoolSize(), 8);
+    assertEquals(_resourceManager._queryWorkerPool.getMaximumPoolSize(), 8);
+  }
+
+  @Test
+  public void testResizeThreadPoolsNoChange() {
+    _resourceManager = getResourceManager(2, 4, 1, 3);
+    _resourceManager.resizeThreadPools(2, 4);
+    assertEquals(_resourceManager.getNumQueryRunnerThreads(), 2);
+    assertEquals(_resourceManager.getNumQueryWorkerThreads(), 4);
+  }
+
+  @Test
+  public void testResizeThreadPoolsInvalidValues() {
+    _resourceManager = getResourceManager(2, 4, 1, 3);
+
+    _resourceManager.resizeThreadPools(0, 4);
+    assertEquals(_resourceManager.getNumQueryRunnerThreads(), 2);
+    assertEquals(_resourceManager.getNumQueryWorkerThreads(), 4);
+
+    _resourceManager.resizeThreadPools(2, -1);
+    assertEquals(_resourceManager.getNumQueryRunnerThreads(), 2);
+    assertEquals(_resourceManager.getNumQueryWorkerThreads(), 4);
+  }
+
+  @Test
+  public void testResizeListenerCalledOnResize() {
+    _resourceManager = getResourceManager(2, 4, 1, 3);
+    AtomicInteger capturedRunners = new AtomicInteger(-1);
+    AtomicInteger capturedWorkers = new AtomicInteger(-1);
+    _resourceManager.addThreadPoolResizeListener((newRunnerThreads, 
newWorkerThreads) -> {
+      capturedRunners.set(newRunnerThreads);
+      capturedWorkers.set(newWorkerThreads);
+    });
+
+    _resourceManager.resizeThreadPools(6, 12);
+    assertEquals(capturedRunners.get(), 6);
+    assertEquals(capturedWorkers.get(), 12);
+  }
+
+  @Test
+  public void testResizeListenerNotCalledOnNoChange() {
+    _resourceManager = getResourceManager(2, 4, 1, 3);
+    AtomicInteger callCount = new AtomicInteger(0);
+    _resourceManager.addThreadPoolResizeListener((newRunnerThreads, 
newWorkerThreads) -> callCount.incrementAndGet());
+
+    _resourceManager.resizeThreadPools(2, 4);
+    assertEquals(callCount.get(), 0);
+  }
+
+  @Test
+  public void testResizeListenerNotCalledOnInvalidValues() {
+    _resourceManager = getResourceManager(2, 4, 1, 3);
+    AtomicInteger callCount = new AtomicInteger(0);
+    _resourceManager.addThreadPoolResizeListener((newRunnerThreads, 
newWorkerThreads) -> callCount.incrementAndGet());
+
+    _resourceManager.resizeThreadPools(0, 4);
+    assertEquals(callCount.get(), 0);
+
+    _resourceManager.resizeThreadPools(2, -1);
+    assertEquals(callCount.get(), 0);
   }
 
   private ResourceManager getResourceManager(int runners, int workers, final 
int softLimit, final int hardLimit) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
index edb8da4e2bd..faabf605154 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
@@ -19,8 +19,11 @@
 package org.apache.pinot.segment.local.realtime.impl.invertedindex;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.pinot.common.utils.ScalingThreadPoolExecutor;
 import org.apache.pinot.spi.query.QueryThreadContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -29,18 +32,25 @@ import org.apache.pinot.spi.query.QueryThreadContext;
  * an accompanying Lucene searcher thread if needed. init() is called in 
BaseServerStarter to avoid creating a
  * dependency on pinot-core.
  *
- * The executor is wrapped with 
QueryThreadContext.contextAwareExecutorService(executor, false) to propagate
+ * <p>The pool supports dynamic resizing via {@link #resize(int)} so that it 
stays in sync when
+ * query_worker_threads is changed at runtime through cluster config.
+ *
+ * <p>The executor is wrapped with 
QueryThreadContext.contextAwareExecutorService(executor, false) to propagate
  * QueryThreadContext for CPU/memory tracking, but WITHOUT registering tasks 
for cancellation. This prevents
  * Thread.interrupt() during Lucene search which could corrupt FSDirectory 
used by IndexWriter.
  * See https://github.com/apache/lucene/issues/3315 and 
https://github.com/apache/lucene/issues/9309
  */
 public class RealtimeLuceneTextIndexSearcherPool {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeLuceneTextIndexSearcherPool.class);
+
   private static RealtimeLuceneTextIndexSearcherPool _singletonInstance;
-  private static ExecutorService _executorService;
+
+  private final ThreadPoolExecutor _baseExecutor;
+  private final ExecutorService _executorService;
 
   private RealtimeLuceneTextIndexSearcherPool(int size) {
-    ExecutorService baseExecutor = 
ScalingThreadPoolExecutor.newScalingThreadPool(0, size, 500);
-    _executorService = 
QueryThreadContext.contextAwareExecutorService(baseExecutor, false);
+    _baseExecutor = (ThreadPoolExecutor) 
ScalingThreadPoolExecutor.newScalingThreadPool(0, size, 500);
+    _executorService = 
QueryThreadContext.contextAwareExecutorService(_baseExecutor, false);
   }
 
   public static RealtimeLuceneTextIndexSearcherPool getInstance() {
@@ -58,4 +68,33 @@ public class RealtimeLuceneTextIndexSearcherPool {
   public ExecutorService getExecutorService() {
     return _executorService;
   }
+
+  /**
+   * Dynamically resizes the maximum pool size of the underlying {@link 
ScalingThreadPoolExecutor}.
+   * Since the base executor has corePoolSize=0, only maximumPoolSize needs to 
be adjusted.
+   * When shrinking, idle threads are interrupted immediately; busy threads 
finish their current
+   * Lucene search task before exiting, so no in-flight search is disrupted.
+   *
+   * @param newMaxSize the new maximum thread count (must be &gt; 0)
+   */
+  public synchronized void resize(int newMaxSize) {
+    if (newMaxSize <= 0) {
+      LOGGER.warn("Invalid Lucene searcher pool size: {}. Must be > 0. 
Skipping resize.", newMaxSize);
+      return;
+    }
+    int oldMaxSize = _baseExecutor.getMaximumPoolSize();
+    if (oldMaxSize == newMaxSize) {
+      LOGGER.debug("Lucene searcher pool size unchanged at {}. Skipping 
resize.", newMaxSize);
+      return;
+    }
+    _baseExecutor.setMaximumPoolSize(newMaxSize);
+    LOGGER.info("Resized Lucene searcher pool: {} -> {}", oldMaxSize, 
newMaxSize);
+  }
+
+  /**
+   * Returns the current maximum pool size. Primarily for testing.
+   */
+  public int getMaxPoolSize() {
+    return _baseExecutor.getMaximumPoolSize();
+  }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPoolTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPoolTest.java
new file mode 100644
index 00000000000..2fcd9beb8e1
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPoolTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.invertedindex;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+public class RealtimeLuceneTextIndexSearcherPoolTest {
+
+  @Test
+  public void testInitAndGetInstance() {
+    RealtimeLuceneTextIndexSearcherPool pool = 
RealtimeLuceneTextIndexSearcherPool.init(4);
+    assertNotNull(pool);
+    assertEquals(pool.getMaxPoolSize(), 4);
+    assertNotNull(pool.getExecutorService());
+    assertEquals(RealtimeLuceneTextIndexSearcherPool.getInstance(), pool);
+  }
+
+  @Test
+  public void testResizeIncrease() {
+    RealtimeLuceneTextIndexSearcherPool pool = 
RealtimeLuceneTextIndexSearcherPool.init(4);
+    assertEquals(pool.getMaxPoolSize(), 4);
+
+    pool.resize(8);
+    assertEquals(pool.getMaxPoolSize(), 8);
+  }
+
+  @Test
+  public void testResizeDecrease() {
+    RealtimeLuceneTextIndexSearcherPool pool = 
RealtimeLuceneTextIndexSearcherPool.init(8);
+    assertEquals(pool.getMaxPoolSize(), 8);
+
+    pool.resize(4);
+    assertEquals(pool.getMaxPoolSize(), 4);
+  }
+
+  @Test
+  public void testResizeSameSize() {
+    RealtimeLuceneTextIndexSearcherPool pool = 
RealtimeLuceneTextIndexSearcherPool.init(4);
+    assertEquals(pool.getMaxPoolSize(), 4);
+
+    // No-op when size is unchanged
+    pool.resize(4);
+    assertEquals(pool.getMaxPoolSize(), 4);
+  }
+
+  @Test
+  public void testResizeInvalidSize() {
+    RealtimeLuceneTextIndexSearcherPool pool = 
RealtimeLuceneTextIndexSearcherPool.init(4);
+    assertEquals(pool.getMaxPoolSize(), 4);
+
+    // Zero and negative values should be ignored
+    pool.resize(0);
+    assertEquals(pool.getMaxPoolSize(), 4);
+
+    pool.resize(-1);
+    assertEquals(pool.getMaxPoolSize(), 4);
+  }
+}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 3f544a96ead..fe16f0685ac 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -85,6 +85,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
 import 
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager;
 import 
org.apache.pinot.core.data.manager.realtime.ServerRateLimitConfigChangeListener;
 import org.apache.pinot.core.instance.context.ServerContext;
+import 
org.apache.pinot.core.query.scheduler.QuerySchedulerThreadPoolConfigChangeListener;
 import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.core.transport.ListenerConfig;
 import org.apache.pinot.core.transport.NettyInspector;
@@ -803,6 +804,14 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     }
     
_clusterConfigChangeHandler.registerClusterConfigChangeListener(_segmentOperationsThrottlerSet);
     
_clusterConfigChangeHandler.registerClusterConfigChangeListener(keepPipelineBreakerStatsPredicate);
+    ResourceManager resourceManager = 
_serverInstance.getQueryScheduler().getResourceManager();
+    _clusterConfigChangeHandler.registerClusterConfigChangeListener(
+        new QuerySchedulerThreadPoolConfigChangeListener(resourceManager));
+
+    // Keep the Lucene searcher pool in sync with query_worker_threads changes
+    resourceManager.addThreadPoolResizeListener((newRunnerThreads, 
newWorkerThreads) -> {
+      _realtimeLuceneTextIndexSearcherPool.resize(newWorkerThreads);
+    });
 
     if (sendStatsPredicate.needWatchForInstanceConfigChange()) {
       LOGGER.info("Initializing and registering the SendStatsPredicate");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to