This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7e10a36af61 [enhance] Support dynamically modify server thread pool
(#18047)
7e10a36af61 is described below
commit 7e10a36af617134827631980e3701ffa18a39b14
Author: Hongkun Xu <[email protected]>
AuthorDate: Sun Apr 12 16:13:35 2026 +0800
[enhance] Support dynamically modify server thread pool (#18047)
* Support dynamiclly modify server thread pool
Signed-off-by: Hongkun Xu <[email protected]>
* add relevant class modify
Signed-off-by: Hongkun Xu <[email protected]>
* add synchonize for resizePool method
Signed-off-by: Hongkun Xu <[email protected]>
* support roback to default query thread pool config
Signed-off-by: Hongkun Xu <[email protected]>
* add patch for lucene thread pool
Signed-off-by: Hongkun Xu <[email protected]>
---------
Signed-off-by: Hongkun Xu <[email protected]>
---
.../core/query/scheduler/PriorityScheduler.java | 32 +++-
.../pinot/core/query/scheduler/QueryScheduler.java | 4 +
...erySchedulerThreadPoolConfigChangeListener.java | 96 ++++++++++
.../resources/BinaryWorkloadResourceManager.java | 8 +-
.../resources/PolicyBasedResourceManager.java | 7 +-
.../query/scheduler/resources/ResourceManager.java | 107 ++++++++++-
.../query/scheduler/PrioritySchedulerTest.java | 29 ++-
...chedulerThreadPoolConfigChangeListenerTest.java | 213 +++++++++++++++++++++
.../resources/PolicyBasedResourceManagerTest.java | 76 ++++++++
.../scheduler/resources/ResourceManagerTest.java | 107 ++++++++++-
.../RealtimeLuceneTextIndexSearcherPool.java | 47 ++++-
.../RealtimeLuceneTextIndexSearcherPoolTest.java | 78 ++++++++
.../server/starter/helix/BaseServerStarter.java | 9 +
13 files changed, 792 insertions(+), 21 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
index 7a7a5d7b7c2..15dbd600fde 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
@@ -42,11 +42,31 @@ import org.slf4j.LoggerFactory;
public abstract class PriorityScheduler extends QueryScheduler {
private static final Logger LOGGER =
LoggerFactory.getLogger(PriorityScheduler.class);
+ /**
+ * A {@link Semaphore} subclass that supports adjusting the number of
permits at runtime.
+ * This is needed because {@link Semaphore#reducePermits(int)} is protected.
+ */
+ @VisibleForTesting
+ static class ResizableSemaphore extends Semaphore {
+ ResizableSemaphore(int permits) {
+ super(permits);
+ }
+
+ void resize(int oldPermits, int newPermits) {
+ int delta = newPermits - oldPermits;
+ if (delta > 0) {
+ release(delta);
+ } else if (delta < 0) {
+ reducePermits(-delta);
+ }
+ }
+ }
+
protected final SchedulerPriorityQueue _queryQueue;
@VisibleForTesting
- protected final Semaphore _runningQueriesSemaphore;
- private final int _numRunners;
+ protected final ResizableSemaphore _runningQueriesSemaphore;
+ private volatile int _numRunners;
@VisibleForTesting
Thread _scheduler;
@@ -56,7 +76,13 @@ public abstract class PriorityScheduler extends
QueryScheduler {
super(config, instanceId, queryExecutor, threadAccountant,
latestQueryTime, resourceManager);
_queryQueue = queue;
_numRunners = resourceManager.getNumQueryRunnerThreads();
- _runningQueriesSemaphore = new Semaphore(_numRunners);
+ _runningQueriesSemaphore = new ResizableSemaphore(_numRunners);
+ resourceManager.addThreadPoolResizeListener((newRunnerThreads,
newWorkerThreads) -> {
+ int oldRunners = _numRunners;
+ _runningQueriesSemaphore.resize(oldRunners, newRunnerThreads);
+ _numRunners = newRunnerThreads;
+ LOGGER.info("Resized running queries semaphore: {} -> {}", oldRunners,
newRunnerThreads);
+ });
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 9d488f7ee0f..3f7e891c61a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -80,6 +80,10 @@ public abstract class QueryScheduler {
_resourceManager = resourceManager;
}
+ public ResourceManager getResourceManager() {
+ return _resourceManager;
+ }
+
/**
* Submit a query for execution. The query will be scheduled for execution
as per the scheduling algorithm
* @param queryRequest query to schedule for execution
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerThreadPoolConfigChangeListener.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerThreadPoolConfigChangeListener.java
new file mode 100644
index 00000000000..9c2efd9ad6b
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerThreadPoolConfigChangeListener.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Listens for cluster config changes to dynamically resize the query
scheduler thread pools
+ * ({@code query_runner_threads} and {@code query_worker_threads}).
+ */
+public class QuerySchedulerThreadPoolConfigChangeListener implements
PinotClusterConfigChangeListener {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QuerySchedulerThreadPoolConfigChangeListener.class);
+
+ static final String QUERY_RUNNER_THREADS_KEY =
+ CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." +
ResourceManager.QUERY_RUNNER_CONFIG_KEY;
+ static final String QUERY_WORKER_THREADS_KEY =
+ CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX + "." +
ResourceManager.QUERY_WORKER_CONFIG_KEY;
+
+ private final ResourceManager _resourceManager;
+
+ public QuerySchedulerThreadPoolConfigChangeListener(ResourceManager
resourceManager) {
+ _resourceManager = resourceManager;
+ }
+
+ @Override
+ public void onChange(Set<String> changedConfigs, Map<String, String>
clusterConfigs) {
+ if (!changedConfigs.contains(QUERY_RUNNER_THREADS_KEY)
+ && !changedConfigs.contains(QUERY_WORKER_THREADS_KEY)) {
+ return;
+ }
+
+ int newRunnerThreads = _resourceManager.getNumQueryRunnerThreads();
+ int newWorkerThreads = _resourceManager.getNumQueryWorkerThreads();
+
+ if (changedConfigs.contains(QUERY_RUNNER_THREADS_KEY)) {
+ if (clusterConfigs.containsKey(QUERY_RUNNER_THREADS_KEY)) {
+ try {
+ newRunnerThreads =
Integer.parseInt(clusterConfigs.get(QUERY_RUNNER_THREADS_KEY));
+ } catch (NumberFormatException e) {
+ LOGGER.error("Invalid value for {}: '{}'. Keeping current value: {}",
+ QUERY_RUNNER_THREADS_KEY,
clusterConfigs.get(QUERY_RUNNER_THREADS_KEY),
+ _resourceManager.getNumQueryRunnerThreads(), e);
+ return;
+ }
+ } else {
+ newRunnerThreads = ResourceManager.DEFAULT_QUERY_RUNNER_THREADS;
+ LOGGER.info("Key '{}' was removed from cluster config. Reverting to
system default: {}",
+ QUERY_RUNNER_THREADS_KEY, newRunnerThreads);
+ }
+ }
+
+ if (changedConfigs.contains(QUERY_WORKER_THREADS_KEY)) {
+ if (clusterConfigs.containsKey(QUERY_WORKER_THREADS_KEY)) {
+ try {
+ newWorkerThreads =
Integer.parseInt(clusterConfigs.get(QUERY_WORKER_THREADS_KEY));
+ } catch (NumberFormatException e) {
+ LOGGER.error("Invalid value for {}: '{}'. Keeping current value: {}",
+ QUERY_WORKER_THREADS_KEY,
clusterConfigs.get(QUERY_WORKER_THREADS_KEY),
+ _resourceManager.getNumQueryWorkerThreads(), e);
+ return;
+ }
+ } else {
+ newWorkerThreads = ResourceManager.DEFAULT_QUERY_WORKER_THREADS;
+ LOGGER.info("Key '{}' was removed from cluster config. Reverting to
system default: {}",
+ QUERY_WORKER_THREADS_KEY, newWorkerThreads);
+ }
+ }
+
+ LOGGER.info("Cluster config change detected. Resizing query scheduler
thread pools: runner={}, worker={}",
+ newRunnerThreads, newWorkerThreads);
+ _resourceManager.resizeThreadPools(newRunnerThreads, newWorkerThreads);
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
index fb7ffd93e3f..dcc25ac4e06 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java
@@ -32,13 +32,19 @@ import org.slf4j.LoggerFactory;
*/
public class BinaryWorkloadResourceManager extends ResourceManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(BinaryWorkloadResourceManager.class);
- private final ResourceLimitPolicy _secondaryWorkloadPolicy;
+ // Volatile to ensure visibility across query threads when the policy is
replaced by onThreadPoolsResized()
+ private volatile ResourceLimitPolicy _secondaryWorkloadPolicy;
public BinaryWorkloadResourceManager(PinotConfiguration config) {
super(config);
_secondaryWorkloadPolicy = new ResourceLimitPolicy(config,
_numQueryWorkerThreads);
}
+ @Override
+ protected void onThreadPoolsResized(int newRunnerThreads, int
newWorkerThreads) {
+ _secondaryWorkloadPolicy = new ResourceLimitPolicy(_config,
newWorkerThreads);
+ }
+
/**
* Returns an executor service that query executor can use like a dedicated
* service for submitting jobs for parallel execution.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManager.java
index dd2c942c565..8e32a3f2c77 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManager.java
@@ -32,13 +32,18 @@ import org.slf4j.LoggerFactory;
public class PolicyBasedResourceManager extends ResourceManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(PolicyBasedResourceManager.class);
- private final ResourceLimitPolicy _resourcePolicy;
+ private volatile ResourceLimitPolicy _resourcePolicy;
public PolicyBasedResourceManager(PinotConfiguration config) {
super(config);
_resourcePolicy = new ResourceLimitPolicy(config, _numQueryWorkerThreads);
}
+ @Override
+ protected void onThreadPoolsResized(int newRunnerThreads, int
newWorkerThreads) {
+ _resourcePolicy = new ResourceLimitPolicy(_config, newWorkerThreads);
+ }
+
/**
* Returns an executor service that query executor can use like a dedicated
* service for submitting jobs for parallel execution.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
index 4f156b52feb..cc3813ec5aa 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/ResourceManager.java
@@ -21,9 +21,12 @@ package org.apache.pinot.core.query.scheduler.resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.pinot.core.executor.ThrottleOnCriticalHeapUsageExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
@@ -59,6 +62,14 @@ public abstract class ResourceManager {
DEFAULT_QUERY_WORKER_THREADS = 2 * numCores;
}
+ /**
+ * Listener notified after thread pools have been resized.
+ */
+ @FunctionalInterface
+ public interface ThreadPoolResizeListener {
+ void onThreadPoolsResized(int newRunnerThreads, int newWorkerThreads);
+ }
+
// set the main query runner priority higher than NORM but lower than MAX
// because if a query is complete we want to deserialize and return response
as soon
// as possible
@@ -67,15 +78,21 @@ public abstract class ResourceManager {
// including planning, distributing operators across threads, waiting and
// reducing the results from the parallel set of operators (CombineOperator)
//
+ protected final PinotConfiguration _config;
protected final ListeningExecutorService _queryRunners;
protected final ListeningExecutorService _queryWorkers;
- protected final int _numQueryRunnerThreads;
- protected final int _numQueryWorkerThreads;
+ protected final ThreadPoolExecutor _queryRunnerPool;
+ protected final ThreadPoolExecutor _queryWorkerPool;
+ protected volatile int _numQueryRunnerThreads;
+ protected volatile int _numQueryWorkerThreads;
+
+ private final List<ThreadPoolResizeListener> _resizeListeners = new
CopyOnWriteArrayList<>();
/**
* @param config configuration for initializing resource manager
*/
public ResourceManager(PinotConfiguration config) {
+ _config = config;
_numQueryRunnerThreads = config.getProperty(QUERY_RUNNER_CONFIG_KEY,
DEFAULT_QUERY_RUNNER_THREADS);
_numQueryWorkerThreads = config.getProperty(QUERY_WORKER_CONFIG_KEY,
DEFAULT_QUERY_WORKER_THREADS);
@@ -85,20 +102,94 @@ public abstract class ResourceManager {
ThreadFactory queryRunnerFactory = new
TracedThreadFactory(QUERY_RUNNER_THREAD_PRIORITY, false,
CommonConstants.ExecutorService.PINOT_QUERY_RUNNER_NAME_FORMAT);
- ExecutorService runnerService =
Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory);
- runnerService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
- runnerService, config, "query runner");
+ _queryRunnerPool = (ThreadPoolExecutor)
Executors.newFixedThreadPool(_numQueryRunnerThreads, queryRunnerFactory);
+ ExecutorService runnerService =
ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
+ _queryRunnerPool, config, "query runner");
_queryRunners = MoreExecutors.listeningDecorator(runnerService);
// pqw -> pinot query workers
ThreadFactory queryWorkersFactory = new
TracedThreadFactory(Thread.NORM_PRIORITY, false,
CommonConstants.ExecutorService.PINOT_QUERY_WORKER_NAME_FORMAT);
- ExecutorService workerService =
Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory);
- workerService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
- workerService, config, "query worker");
+ _queryWorkerPool = (ThreadPoolExecutor)
Executors.newFixedThreadPool(_numQueryWorkerThreads, queryWorkersFactory);
+ ExecutorService workerService =
ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
+ _queryWorkerPool, config, "query worker");
_queryWorkers = MoreExecutors.listeningDecorator(workerService);
}
+ /**
+ * Dynamically resizes the query runner and worker thread pools. Resizing is
performed on the underlying
+ * {@link ThreadPoolExecutor} instances, which is transparent to any
decorator wrappers.
+ *
+ * @param newRunnerThreads desired number of query runner threads (must be
> 0)
+ * @param newWorkerThreads desired number of query worker threads (must be
> 0)
+ */
+ public synchronized void resizeThreadPools(int newRunnerThreads, int
newWorkerThreads) {
+ if (newRunnerThreads <= 0 || newWorkerThreads <= 0) {
+ LOGGER.warn("Invalid thread pool sizes: runnerThreads={},
workerThreads={}. Sizes must be > 0. Skipping resize.",
+ newRunnerThreads, newWorkerThreads);
+ return;
+ }
+
+ int oldRunnerThreads = _numQueryRunnerThreads;
+ int oldWorkerThreads = _numQueryWorkerThreads;
+
+ if (oldRunnerThreads == newRunnerThreads && oldWorkerThreads ==
newWorkerThreads) {
+ LOGGER.debug("Thread pool sizes unchanged (runner={}, worker={}).
Skipping resize.",
+ newRunnerThreads, newWorkerThreads);
+ return;
+ }
+
+ resizePool(_queryRunnerPool, oldRunnerThreads, newRunnerThreads,
"queryRunner");
+ _numQueryRunnerThreads = newRunnerThreads;
+
+ resizePool(_queryWorkerPool, oldWorkerThreads, newWorkerThreads,
"queryWorker");
+ _numQueryWorkerThreads = newWorkerThreads;
+
+ LOGGER.info("Resized thread pools: runner {} -> {}, worker {} -> {}",
+ oldRunnerThreads, newRunnerThreads, oldWorkerThreads,
newWorkerThreads);
+
+ onThreadPoolsResized(newRunnerThreads, newWorkerThreads);
+ for (ThreadPoolResizeListener listener : _resizeListeners) {
+ listener.onThreadPoolsResized(newRunnerThreads, newWorkerThreads);
+ }
+ }
+
+ /**
+ * Registers a listener to be notified after thread pools are resized.
+ */
+ public void addThreadPoolResizeListener(ThreadPoolResizeListener listener) {
+ _resizeListeners.add(listener);
+ }
+
+ /**
+ * Hook for subclasses to update dependent state (e.g. resource limit
policies) after thread pools are resized.
+ * Called before external listeners are notified.
+ */
+ protected void onThreadPoolsResized(int newRunnerThreads, int
newWorkerThreads) {
+ }
+
+ private synchronized void resizePool(ThreadPoolExecutor pool, int oldSize,
int newSize, String poolName) {
+ if (oldSize == newSize) {
+ return;
+ }
+ // Scale up:
+ // Increase maximumPoolSize first, then corePoolSize.
+ // If corePoolSize is increased first, it may temporarily exceed
maximumPoolSize,
+ // which would cause an IllegalArgumentException.
+ if (newSize > oldSize) {
+ pool.setMaximumPoolSize(newSize);
+ pool.setCorePoolSize(newSize);
+ } else {
+ // Scale down:
+ // Decrease corePoolSize first, then maximumPoolSize.
+ // If maximumPoolSize is decreased first, it may become smaller than
corePoolSize,
+ // which would also cause an IllegalArgumentException.
+ pool.setCorePoolSize(newSize);
+ pool.setMaximumPoolSize(newSize);
+ }
+ LOGGER.info("Resized {} pool: {} -> {}", poolName, oldSize, newSize);
+ }
+
public void stop() {
_queryWorkers.shutdownNow();
_queryRunners.shutdownNow();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
index fa7a2a0584d..141fcf89863 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
@@ -235,6 +235,32 @@ public class PrioritySchedulerTest {
scheduler.stop();
}
+ @Test
+ public void testResizeUpdatesSemaphoreIncrease() {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(ResourceManager.QUERY_RUNNER_CONFIG_KEY, 4);
+ properties.put(ResourceManager.QUERY_WORKER_CONFIG_KEY, 8);
+ TestPriorityScheduler scheduler = TestPriorityScheduler.create(new
PinotConfiguration(properties));
+ assertEquals(scheduler.getRunningQueriesSemaphore().availablePermits(), 4);
+
+ scheduler.getResourceManager().resizeThreadPools(8, 16);
+ assertEquals(scheduler.getRunningQueriesSemaphore().availablePermits(), 8);
+ scheduler.stop();
+ }
+
+ @Test
+ public void testResizeUpdatesSemaphoreDecrease() {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(ResourceManager.QUERY_RUNNER_CONFIG_KEY, 8);
+ properties.put(ResourceManager.QUERY_WORKER_CONFIG_KEY, 16);
+ TestPriorityScheduler scheduler = TestPriorityScheduler.create(new
PinotConfiguration(properties));
+ assertEquals(scheduler.getRunningQueriesSemaphore().availablePermits(), 8);
+
+ scheduler.getResourceManager().resizeThreadPools(4, 8);
+ assertEquals(scheduler.getRunningQueriesSemaphore().availablePermits(), 4);
+ scheduler.stop();
+ }
+
static class TestPriorityScheduler extends PriorityScheduler {
static TestSchedulerGroupFactory _groupFactory;
static LongAccumulator _latestQueryTime;
@@ -261,7 +287,8 @@ public class PrioritySchedulerTest {
return create(new PinotConfiguration());
}
- ResourceManager getResourceManager() {
+ @Override
+ public ResourceManager getResourceManager() {
return _resourceManager;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerThreadPoolConfigChangeListenerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerThreadPoolConfigChangeListenerTest.java
new file mode 100644
index 00000000000..5eb4fa78fe6
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerThreadPoolConfigChangeListenerTest.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class QuerySchedulerThreadPoolConfigChangeListenerTest {
+
+ @Test
+ public void testOnChangeWithRelevantKeys() {
+ ResourceManager resourceManager = mock(ResourceManager.class);
+ when(resourceManager.getNumQueryRunnerThreads()).thenReturn(4);
+ when(resourceManager.getNumQueryWorkerThreads()).thenReturn(8);
+
+ QuerySchedulerThreadPoolConfigChangeListener listener =
+ new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+ Map<String, String> clusterConfigs = new HashMap<>();
+
clusterConfigs.put(QuerySchedulerThreadPoolConfigChangeListener.QUERY_RUNNER_THREADS_KEY,
"10");
+
clusterConfigs.put(QuerySchedulerThreadPoolConfigChangeListener.QUERY_WORKER_THREADS_KEY,
"20");
+
+ Set<String> changedConfigs = Set.of(
+ QuerySchedulerThreadPoolConfigChangeListener.QUERY_RUNNER_THREADS_KEY,
+ QuerySchedulerThreadPoolConfigChangeListener.QUERY_WORKER_THREADS_KEY);
+
+ listener.onChange(changedConfigs, clusterConfigs);
+
+ verify(resourceManager).resizeThreadPools(10, 20);
+ }
+
+ @Test
+ public void testOnChangeRunnerOnly() {
+ ResourceManager resourceManager = mock(ResourceManager.class);
+ when(resourceManager.getNumQueryRunnerThreads()).thenReturn(4);
+ when(resourceManager.getNumQueryWorkerThreads()).thenReturn(8);
+
+ QuerySchedulerThreadPoolConfigChangeListener listener =
+ new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+ Map<String, String> clusterConfigs = new HashMap<>();
+
clusterConfigs.put(QuerySchedulerThreadPoolConfigChangeListener.QUERY_RUNNER_THREADS_KEY,
"10");
+
+ Set<String> changedConfigs = Set.of(
+ QuerySchedulerThreadPoolConfigChangeListener.QUERY_RUNNER_THREADS_KEY);
+
+ listener.onChange(changedConfigs, clusterConfigs);
+
+ verify(resourceManager).resizeThreadPools(10, 8);
+ }
+
+ @Test
+ public void testOnChangeWorkerOnly() {
+ ResourceManager resourceManager = mock(ResourceManager.class);
+ when(resourceManager.getNumQueryRunnerThreads()).thenReturn(4);
+ when(resourceManager.getNumQueryWorkerThreads()).thenReturn(8);
+
+ QuerySchedulerThreadPoolConfigChangeListener listener =
+ new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+ Map<String, String> clusterConfigs = new HashMap<>();
+
clusterConfigs.put(QuerySchedulerThreadPoolConfigChangeListener.QUERY_WORKER_THREADS_KEY,
"20");
+
+ Set<String> changedConfigs = Set.of(
+ QuerySchedulerThreadPoolConfigChangeListener.QUERY_WORKER_THREADS_KEY);
+
+ listener.onChange(changedConfigs, clusterConfigs);
+
+ verify(resourceManager).resizeThreadPools(4, 20);
+ }
+
+ @Test
+ public void testOnChangeUnrelatedKeys() {
+ ResourceManager resourceManager = mock(ResourceManager.class);
+
+ QuerySchedulerThreadPoolConfigChangeListener listener =
+ new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+ Map<String, String> clusterConfigs = new HashMap<>();
+ clusterConfigs.put("some.other.config", "value");
+
+ Set<String> changedConfigs = Set.of("some.other.config");
+
+ listener.onChange(changedConfigs, clusterConfigs);
+
+ verify(resourceManager, never()).resizeThreadPools(anyInt(), anyInt());
+ }
+
+ @Test
+ public void testOnChangeInvalidRunnerValue() {
+ ResourceManager resourceManager = mock(ResourceManager.class);
+ when(resourceManager.getNumQueryRunnerThreads()).thenReturn(4);
+ when(resourceManager.getNumQueryWorkerThreads()).thenReturn(8);
+
+ QuerySchedulerThreadPoolConfigChangeListener listener =
+ new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+ Map<String, String> clusterConfigs = new HashMap<>();
+
clusterConfigs.put(QuerySchedulerThreadPoolConfigChangeListener.QUERY_RUNNER_THREADS_KEY,
"not_a_number");
+
+ Set<String> changedConfigs = Set.of(
+ QuerySchedulerThreadPoolConfigChangeListener.QUERY_RUNNER_THREADS_KEY);
+
+ listener.onChange(changedConfigs, clusterConfigs);
+
+ verify(resourceManager, never()).resizeThreadPools(anyInt(), anyInt());
+ }
+
+ @Test
+ public void testOnChangeInvalidWorkerValue() {
+ ResourceManager resourceManager = mock(ResourceManager.class);
+ when(resourceManager.getNumQueryRunnerThreads()).thenReturn(4);
+ when(resourceManager.getNumQueryWorkerThreads()).thenReturn(8);
+
+ QuerySchedulerThreadPoolConfigChangeListener listener =
+ new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+ Map<String, String> clusterConfigs = new HashMap<>();
+
clusterConfigs.put(QuerySchedulerThreadPoolConfigChangeListener.QUERY_WORKER_THREADS_KEY,
"invalid");
+
+ Set<String> changedConfigs = Set.of(
+ QuerySchedulerThreadPoolConfigChangeListener.QUERY_WORKER_THREADS_KEY);
+
+ listener.onChange(changedConfigs, clusterConfigs);
+
+ verify(resourceManager, never()).resizeThreadPools(anyInt(), anyInt());
+ }
+
+ @Test
+ public void testOnChangeDeletedRunnerKeyRevertsToDefault() {
+ ResourceManager resourceManager = mock(ResourceManager.class);
+ when(resourceManager.getNumQueryRunnerThreads()).thenReturn(32);
+ when(resourceManager.getNumQueryWorkerThreads()).thenReturn(8);
+
+ QuerySchedulerThreadPoolConfigChangeListener listener =
+ new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+ Map<String, String> clusterConfigs = new HashMap<>();
+
+ Set<String> changedConfigs = Set.of(
+ QuerySchedulerThreadPoolConfigChangeListener.QUERY_RUNNER_THREADS_KEY);
+
+ listener.onChange(changedConfigs, clusterConfigs);
+
+
verify(resourceManager).resizeThreadPools(ResourceManager.DEFAULT_QUERY_RUNNER_THREADS,
8);
+ }
+
+ @Test
+ public void testOnChangeDeletedWorkerKeyRevertsToDefault() {
+ ResourceManager resourceManager = mock(ResourceManager.class);
+ when(resourceManager.getNumQueryRunnerThreads()).thenReturn(4);
+ when(resourceManager.getNumQueryWorkerThreads()).thenReturn(64);
+
+ QuerySchedulerThreadPoolConfigChangeListener listener =
+ new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+ Map<String, String> clusterConfigs = new HashMap<>();
+
+ Set<String> changedConfigs = Set.of(
+ QuerySchedulerThreadPoolConfigChangeListener.QUERY_WORKER_THREADS_KEY);
+
+ listener.onChange(changedConfigs, clusterConfigs);
+
+ verify(resourceManager).resizeThreadPools(4,
ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
+ }
+
+ @Test
+ public void testOnChangeBothKeysDeletedRevertToDefaults() {
+ ResourceManager resourceManager = mock(ResourceManager.class);
+ when(resourceManager.getNumQueryRunnerThreads()).thenReturn(32);
+ when(resourceManager.getNumQueryWorkerThreads()).thenReturn(64);
+
+ QuerySchedulerThreadPoolConfigChangeListener listener =
+ new QuerySchedulerThreadPoolConfigChangeListener(resourceManager);
+
+ Map<String, String> clusterConfigs = new HashMap<>();
+
+ Set<String> changedConfigs = Set.of(
+ QuerySchedulerThreadPoolConfigChangeListener.QUERY_RUNNER_THREADS_KEY,
+ QuerySchedulerThreadPoolConfigChangeListener.QUERY_WORKER_THREADS_KEY);
+
+ listener.onChange(changedConfigs, clusterConfigs);
+
+ verify(resourceManager).resizeThreadPools(
+ ResourceManager.DEFAULT_QUERY_RUNNER_THREADS,
ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManagerTest.java
new file mode 100644
index 00000000000..a8d01ea344b
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/PolicyBasedResourceManagerTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler.resources;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+
+
+public class PolicyBasedResourceManagerTest {
+ private PolicyBasedResourceManager _resourceManager;
+
+ @AfterMethod
+ public void tearDown() {
+ if (_resourceManager != null) {
+ _resourceManager.stop();
+ _resourceManager = null;
+ }
+ }
+
+ @Test
+ public void testResizeUpdatesResourceLimitPolicy() {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(ResourceManager.QUERY_RUNNER_CONFIG_KEY, 4);
+ properties.put(ResourceManager.QUERY_WORKER_CONFIG_KEY, 20);
+ properties.put(ResourceLimitPolicy.TABLE_THREADS_HARD_LIMIT, 50);
+ properties.put(ResourceLimitPolicy.TABLE_THREADS_SOFT_LIMIT, 30);
+ _resourceManager = new PolicyBasedResourceManager(new
PinotConfiguration(properties));
+
+ int originalHardLimit = _resourceManager.getTableThreadsHardLimit();
+ int originalSoftLimit = _resourceManager.getTableThreadsSoftLimit();
+
+ _resourceManager.resizeThreadPools(8, 40);
+
+ assertNotEquals(_resourceManager.getTableThreadsHardLimit(),
originalHardLimit);
+ assertNotEquals(_resourceManager.getTableThreadsSoftLimit(),
originalSoftLimit);
+ assertEquals(_resourceManager.getNumQueryWorkerThreads(), 40);
+ }
+
+ @Test
+ public void testResizeDoesNotUpdatePolicyOnNoChange() {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(ResourceManager.QUERY_RUNNER_CONFIG_KEY, 4);
+ properties.put(ResourceManager.QUERY_WORKER_CONFIG_KEY, 20);
+ _resourceManager = new PolicyBasedResourceManager(new
PinotConfiguration(properties));
+
+ int originalHardLimit = _resourceManager.getTableThreadsHardLimit();
+ int originalSoftLimit = _resourceManager.getTableThreadsSoftLimit();
+
+ _resourceManager.resizeThreadPools(4, 20);
+
+ assertEquals(_resourceManager.getTableThreadsHardLimit(),
originalHardLimit);
+ assertEquals(_resourceManager.getTableThreadsSoftLimit(),
originalSoftLimit);
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/ResourceManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/ResourceManagerTest.java
index 0b8feda8ba6..d0b34b0f849 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/ResourceManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/resources/ResourceManagerTest.java
@@ -20,29 +20,130 @@ package org.apache.pinot.core.query.scheduler.resources;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
public class ResourceManagerTest {
+ private ResourceManager _resourceManager;
+
+ @AfterMethod
+ public void tearDown() {
+ if (_resourceManager != null) {
+ _resourceManager.stop();
+ _resourceManager = null;
+ }
+ }
@Test
public void testCanSchedule() {
- ResourceManager rm = getResourceManager(2, 5, 1, 3);
+ _resourceManager = getResourceManager(2, 5, 1, 3);
SchedulerGroupAccountant accountant = mock(SchedulerGroupAccountant.class);
when(accountant.totalReservedThreads()).thenReturn(3);
- assertFalse(rm.canSchedule(accountant));
+ assertFalse(_resourceManager.canSchedule(accountant));
when(accountant.totalReservedThreads()).thenReturn(2);
- assertTrue(rm.canSchedule(accountant));
+ assertTrue(_resourceManager.canSchedule(accountant));
+ }
+
+ @Test
+ public void testResizeThreadPoolsIncrease() {
+ _resourceManager = getResourceManager(2, 4, 1, 3);
+ assertEquals(_resourceManager.getNumQueryRunnerThreads(), 2);
+ assertEquals(_resourceManager.getNumQueryWorkerThreads(), 4);
+
+ _resourceManager.resizeThreadPools(4, 8);
+ assertEquals(_resourceManager.getNumQueryRunnerThreads(), 4);
+ assertEquals(_resourceManager.getNumQueryWorkerThreads(), 8);
+ assertEquals(_resourceManager._queryRunnerPool.getCorePoolSize(), 4);
+ assertEquals(_resourceManager._queryRunnerPool.getMaximumPoolSize(), 4);
+ assertEquals(_resourceManager._queryWorkerPool.getCorePoolSize(), 8);
+ assertEquals(_resourceManager._queryWorkerPool.getMaximumPoolSize(), 8);
+ }
+
+ @Test
+ public void testResizeThreadPoolsDecrease() {
+ _resourceManager = getResourceManager(8, 16, 1, 3);
+ assertEquals(_resourceManager.getNumQueryRunnerThreads(), 8);
+ assertEquals(_resourceManager.getNumQueryWorkerThreads(), 16);
+
+ _resourceManager.resizeThreadPools(4, 8);
+ assertEquals(_resourceManager.getNumQueryRunnerThreads(), 4);
+ assertEquals(_resourceManager.getNumQueryWorkerThreads(), 8);
+ assertEquals(_resourceManager._queryRunnerPool.getCorePoolSize(), 4);
+ assertEquals(_resourceManager._queryRunnerPool.getMaximumPoolSize(), 4);
+ assertEquals(_resourceManager._queryWorkerPool.getCorePoolSize(), 8);
+ assertEquals(_resourceManager._queryWorkerPool.getMaximumPoolSize(), 8);
+ }
+
+ @Test
+ public void testResizeThreadPoolsNoChange() {
+ _resourceManager = getResourceManager(2, 4, 1, 3);
+ _resourceManager.resizeThreadPools(2, 4);
+ assertEquals(_resourceManager.getNumQueryRunnerThreads(), 2);
+ assertEquals(_resourceManager.getNumQueryWorkerThreads(), 4);
+ }
+
+ @Test
+ public void testResizeThreadPoolsInvalidValues() {
+ _resourceManager = getResourceManager(2, 4, 1, 3);
+
+ _resourceManager.resizeThreadPools(0, 4);
+ assertEquals(_resourceManager.getNumQueryRunnerThreads(), 2);
+ assertEquals(_resourceManager.getNumQueryWorkerThreads(), 4);
+
+ _resourceManager.resizeThreadPools(2, -1);
+ assertEquals(_resourceManager.getNumQueryRunnerThreads(), 2);
+ assertEquals(_resourceManager.getNumQueryWorkerThreads(), 4);
+ }
+
+ @Test
+ public void testResizeListenerCalledOnResize() {
+ _resourceManager = getResourceManager(2, 4, 1, 3);
+ AtomicInteger capturedRunners = new AtomicInteger(-1);
+ AtomicInteger capturedWorkers = new AtomicInteger(-1);
+ _resourceManager.addThreadPoolResizeListener((newRunnerThreads,
newWorkerThreads) -> {
+ capturedRunners.set(newRunnerThreads);
+ capturedWorkers.set(newWorkerThreads);
+ });
+
+ _resourceManager.resizeThreadPools(6, 12);
+ assertEquals(capturedRunners.get(), 6);
+ assertEquals(capturedWorkers.get(), 12);
+ }
+
+ @Test
+ public void testResizeListenerNotCalledOnNoChange() {
+ _resourceManager = getResourceManager(2, 4, 1, 3);
+ AtomicInteger callCount = new AtomicInteger(0);
+ _resourceManager.addThreadPoolResizeListener((newRunnerThreads,
newWorkerThreads) -> callCount.incrementAndGet());
+
+ _resourceManager.resizeThreadPools(2, 4);
+ assertEquals(callCount.get(), 0);
+ }
+
+ @Test
+ public void testResizeListenerNotCalledOnInvalidValues() {
+ _resourceManager = getResourceManager(2, 4, 1, 3);
+ AtomicInteger callCount = new AtomicInteger(0);
+ _resourceManager.addThreadPoolResizeListener((newRunnerThreads,
newWorkerThreads) -> callCount.incrementAndGet());
+
+ _resourceManager.resizeThreadPools(0, 4);
+ assertEquals(callCount.get(), 0);
+
+ _resourceManager.resizeThreadPools(2, -1);
+ assertEquals(callCount.get(), 0);
}
private ResourceManager getResourceManager(int runners, int workers, final
int softLimit, final int hardLimit) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
index edb8da4e2bd..faabf605154 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
@@ -19,8 +19,11 @@
package org.apache.pinot.segment.local.realtime.impl.invertedindex;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.pinot.common.utils.ScalingThreadPoolExecutor;
import org.apache.pinot.spi.query.QueryThreadContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -29,18 +32,25 @@ import org.apache.pinot.spi.query.QueryThreadContext;
* an accompanying Lucene searcher thread if needed. init() is called in
BaseServerStarter to avoid creating a
* dependency on pinot-core.
*
- * The executor is wrapped with
QueryThreadContext.contextAwareExecutorService(executor, false) to propagate
+ * <p>The pool supports dynamic resizing via {@link #resize(int)} so that it
stays in sync when
+ * query_worker_threads is changed at runtime through cluster config.
+ *
+ * <p>The executor is wrapped with
QueryThreadContext.contextAwareExecutorService(executor, false) to propagate
* QueryThreadContext for CPU/memory tracking, but WITHOUT registering tasks
for cancellation. This prevents
* Thread.interrupt() during Lucene search which could corrupt FSDirectory
used by IndexWriter.
* See https://github.com/apache/lucene/issues/3315 and
https://github.com/apache/lucene/issues/9309
*/
public class RealtimeLuceneTextIndexSearcherPool {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeLuceneTextIndexSearcherPool.class);
+
private static RealtimeLuceneTextIndexSearcherPool _singletonInstance;
- private static ExecutorService _executorService;
+
+ private final ThreadPoolExecutor _baseExecutor;
+ private final ExecutorService _executorService;
private RealtimeLuceneTextIndexSearcherPool(int size) {
- ExecutorService baseExecutor =
ScalingThreadPoolExecutor.newScalingThreadPool(0, size, 500);
- _executorService =
QueryThreadContext.contextAwareExecutorService(baseExecutor, false);
+ _baseExecutor = (ThreadPoolExecutor)
ScalingThreadPoolExecutor.newScalingThreadPool(0, size, 500);
+ _executorService =
QueryThreadContext.contextAwareExecutorService(_baseExecutor, false);
}
public static RealtimeLuceneTextIndexSearcherPool getInstance() {
@@ -58,4 +68,33 @@ public class RealtimeLuceneTextIndexSearcherPool {
public ExecutorService getExecutorService() {
return _executorService;
}
+
+ /**
+ * Dynamically resizes the maximum pool size of the underlying {@link
ScalingThreadPoolExecutor}.
+ * Since the base executor has corePoolSize=0, only maximumPoolSize needs to
be adjusted.
+ * When shrinking, idle threads are interrupted immediately; busy threads
finish their current
+ * Lucene search task before exiting, so no in-flight search is disrupted.
+ *
+ * @param newMaxSize the new maximum thread count (must be > 0)
+ */
+ public synchronized void resize(int newMaxSize) {
+ if (newMaxSize <= 0) {
+ LOGGER.warn("Invalid Lucene searcher pool size: {}. Must be > 0.
Skipping resize.", newMaxSize);
+ return;
+ }
+ int oldMaxSize = _baseExecutor.getMaximumPoolSize();
+ if (oldMaxSize == newMaxSize) {
+ LOGGER.debug("Lucene searcher pool size unchanged at {}. Skipping
resize.", newMaxSize);
+ return;
+ }
+ _baseExecutor.setMaximumPoolSize(newMaxSize);
+ LOGGER.info("Resized Lucene searcher pool: {} -> {}", oldMaxSize,
newMaxSize);
+ }
+
+ /**
+ * Returns the current maximum pool size. Primarily for testing.
+ */
+ public int getMaxPoolSize() {
+ return _baseExecutor.getMaximumPoolSize();
+ }
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPoolTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPoolTest.java
new file mode 100644
index 00000000000..2fcd9beb8e1
--- /dev/null
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPoolTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.invertedindex;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+public class RealtimeLuceneTextIndexSearcherPoolTest {
+
+ @Test
+ public void testInitAndGetInstance() {
+ RealtimeLuceneTextIndexSearcherPool pool =
RealtimeLuceneTextIndexSearcherPool.init(4);
+ assertNotNull(pool);
+ assertEquals(pool.getMaxPoolSize(), 4);
+ assertNotNull(pool.getExecutorService());
+ assertEquals(RealtimeLuceneTextIndexSearcherPool.getInstance(), pool);
+ }
+
+ @Test
+ public void testResizeIncrease() {
+ RealtimeLuceneTextIndexSearcherPool pool =
RealtimeLuceneTextIndexSearcherPool.init(4);
+ assertEquals(pool.getMaxPoolSize(), 4);
+
+ pool.resize(8);
+ assertEquals(pool.getMaxPoolSize(), 8);
+ }
+
+ @Test
+ public void testResizeDecrease() {
+ RealtimeLuceneTextIndexSearcherPool pool =
RealtimeLuceneTextIndexSearcherPool.init(8);
+ assertEquals(pool.getMaxPoolSize(), 8);
+
+ pool.resize(4);
+ assertEquals(pool.getMaxPoolSize(), 4);
+ }
+
+ @Test
+ public void testResizeSameSize() {
+ RealtimeLuceneTextIndexSearcherPool pool =
RealtimeLuceneTextIndexSearcherPool.init(4);
+ assertEquals(pool.getMaxPoolSize(), 4);
+
+ // No-op when size is unchanged
+ pool.resize(4);
+ assertEquals(pool.getMaxPoolSize(), 4);
+ }
+
+ @Test
+ public void testResizeInvalidSize() {
+ RealtimeLuceneTextIndexSearcherPool pool =
RealtimeLuceneTextIndexSearcherPool.init(4);
+ assertEquals(pool.getMaxPoolSize(), 4);
+
+ // Zero and negative values should be ignored
+ pool.resize(0);
+ assertEquals(pool.getMaxPoolSize(), 4);
+
+ pool.resize(-1);
+ assertEquals(pool.getMaxPoolSize(), 4);
+ }
+}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 3f544a96ead..fe16f0685ac 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -85,6 +85,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
import
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager;
import
org.apache.pinot.core.data.manager.realtime.ServerRateLimitConfigChangeListener;
import org.apache.pinot.core.instance.context.ServerContext;
+import
org.apache.pinot.core.query.scheduler.QuerySchedulerThreadPoolConfigChangeListener;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.transport.NettyInspector;
@@ -803,6 +804,14 @@ public abstract class BaseServerStarter implements
ServiceStartable {
}
_clusterConfigChangeHandler.registerClusterConfigChangeListener(_segmentOperationsThrottlerSet);
_clusterConfigChangeHandler.registerClusterConfigChangeListener(keepPipelineBreakerStatsPredicate);
+ ResourceManager resourceManager =
_serverInstance.getQueryScheduler().getResourceManager();
+ _clusterConfigChangeHandler.registerClusterConfigChangeListener(
+ new QuerySchedulerThreadPoolConfigChangeListener(resourceManager));
+
+ // Keep the Lucene searcher pool in sync with query_worker_threads changes
+ resourceManager.addThreadPoolResizeListener((newRunnerThreads,
newWorkerThreads) -> {
+ _realtimeLuceneTextIndexSearcherPool.resize(newWorkerThreads);
+ });
if (sendStatsPredicate.needWatchForInstanceConfigChange()) {
LOGGER.info("Initializing and registering the SendStatsPredicate");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]