This is an automated email from the ASF dual-hosted git repository.
shauryachats pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 47c149397b1 Add default.execution.threads server config to decouple
default query parallelism from max (#18622)
47c149397b1 is described below
commit 47c149397b12d555ed83a001845edd38b26c6f5e
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Wed Jun 3 12:28:07 2026 -0700
Add default.execution.threads server config to decouple default query
parallelism from max (#18622)
Currently, `max.execution.threads` serves as both the hard cap and the
implicit default for per-query execution threads. When operators tune it
up, all queries immediately use the higher value.
This introduces `pinot.server.query.executor.default.execution.threads`
so that operators can set a lower default for normal queries while still
allowing selected queries to reach the max via `SET maxExecutionThreads`.
Resolution order:
1. Per-query override (SET maxExecutionThreads=N) — capped by max
2. Server default (default.execution.threads) — capped by max
3. Server max (max.execution.threads) — legacy fallback
When not configured, behavior is unchanged.
Co-authored-by: Cursor <[email protected]>
---
.../core/plan/maker/InstancePlanMakerImplV2.java | 43 +++++++--
.../plan/maker/InstancePlanMakerImplV2Test.java | 104 +++++++++++++++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 4 +
3 files changed, 145 insertions(+), 6 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 371ee1874b9..2980ff7ee2a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -105,6 +105,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
private final FetchPlanner _fetchPlanner = FetchPlannerRegistry.getPlanner();
private int _maxExecutionThreads =
Server.DEFAULT_QUERY_EXECUTOR_MAX_EXECUTION_THREADS;
+ private int _defaultExecutionThreads =
Server.DEFAULT_QUERY_EXECUTOR_DEFAULT_EXECUTION_THREADS;
private int _maxInitialResultHolderCapacity =
Server.DEFAULT_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
private int _minInitialIndexedTableCapacity =
Server.DEFAULT_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY;
// Limit on number of groups stored for each segment, beyond which no new
group will be created
@@ -120,6 +121,9 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
public void init(PinotConfiguration queryExecutorConfig) {
_maxExecutionThreads =
queryExecutorConfig.getProperty(Server.MAX_EXECUTION_THREADS,
Server.DEFAULT_QUERY_EXECUTOR_MAX_EXECUTION_THREADS);
+ _defaultExecutionThreads =
queryExecutorConfig.getProperty(Server.DEFAULT_EXECUTION_THREADS,
+ Server.DEFAULT_QUERY_EXECUTOR_DEFAULT_EXECUTION_THREADS);
+ validateExecutionThreadConfig();
_maxInitialResultHolderCapacity =
queryExecutorConfig.getProperty(Server.MAX_INITIAL_RESULT_HOLDER_CAPACITY,
Server.DEFAULT_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
_minInitialIndexedTableCapacity =
queryExecutorConfig.getProperty(Server.MIN_INITIAL_INDEXED_TABLE_CAPACITY,
@@ -142,10 +146,31 @@ public class InstancePlanMakerImplV2 implements PlanMaker
{
Server.DEFAULT_QUERY_EXECUTOR_GROUPBY_TRIM_THRESHOLD);
Preconditions.checkState(_groupByTrimThreshold > 0,
"Invalid configurable: groupByTrimThreshold: %d must be positive",
_groupByTrimThreshold);
- LOGGER.info("Initialized plan maker with maxExecutionThreads: {},
maxInitialResultHolderCapacity: {}, "
- + "numGroupsLimit: {}, minSegmentGroupTrimSize: {},
minServerGroupTrimSize: {}, groupByTrimThreshold: {}",
- _maxExecutionThreads, _maxInitialResultHolderCapacity,
_numGroupsLimit, _minSegmentGroupTrimSize,
- _minServerGroupTrimSize, _groupByTrimThreshold);
+ LOGGER.info("Initialized plan maker with maxExecutionThreads: {},
defaultExecutionThreads: {}, "
+ + "maxInitialResultHolderCapacity: {}, numGroupsLimit: {},
minSegmentGroupTrimSize: {}, "
+ + "minServerGroupTrimSize: {}, groupByTrimThreshold: {}",
+ _maxExecutionThreads, _defaultExecutionThreads,
_maxInitialResultHolderCapacity, _numGroupsLimit,
+ _minSegmentGroupTrimSize, _minServerGroupTrimSize,
_groupByTrimThreshold);
+ }
+
+ @VisibleForTesting
+ public void setMaxExecutionThreads(int maxExecutionThreads) {
+ _maxExecutionThreads = maxExecutionThreads;
+ validateExecutionThreadConfig();
+ }
+
+ @VisibleForTesting
+ public void setDefaultExecutionThreads(int defaultExecutionThreads) {
+ _defaultExecutionThreads = defaultExecutionThreads;
+ validateExecutionThreadConfig();
+ }
+
+ private void validateExecutionThreadConfig() {
+ if (_defaultExecutionThreads > 0 && _maxExecutionThreads > 0) {
+ Preconditions.checkState(_defaultExecutionThreads <=
_maxExecutionThreads,
+ "Invalid configuration: defaultExecutionThreads: %d must be <=
maxExecutionThreads: %d",
+ _defaultExecutionThreads, _maxExecutionThreads);
+ }
}
@VisibleForTesting
@@ -208,7 +233,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
new InstanceResponsePlanNode(combinePlanNode, segmentContexts,
fetchContexts, queryContext));
}
- private void applyQueryOptions(QueryContext queryContext) {
+ @VisibleForTesting
+ void applyQueryOptions(QueryContext queryContext) {
Map<String, String> queryOptions = queryContext.getQueryOptions();
// Set skipUpsert
@@ -227,15 +253,20 @@ public class InstancePlanMakerImplV2 implements PlanMaker
{
queryContext.setSkipIndexes(QueryOptionsUtils.getSkipIndexes(queryOptions));
// Set maxExecutionThreads
+ // Resolution order:
+ // 1. Per-query override (SET maxExecutionThreads=N) — capped by server
max
+ // 2. Server-level default (default.execution.threads) — decoupled from
max, but still capped by it
+ // 3. Server-level max (max.execution.threads) — legacy fallback
int maxExecutionThreads;
Integer maxExecutionThreadsFromQuery =
QueryOptionsUtils.getMaxExecutionThreads(queryOptions);
if (maxExecutionThreadsFromQuery != null) {
- // Do not allow query to override the execution threads over the
instance-level limit
if (_maxExecutionThreads > 0) {
maxExecutionThreads = Math.min(_maxExecutionThreads,
maxExecutionThreadsFromQuery);
} else {
maxExecutionThreads = maxExecutionThreadsFromQuery;
}
+ } else if (_defaultExecutionThreads > 0) {
+ maxExecutionThreads = _defaultExecutionThreads;
} else {
maxExecutionThreads = _maxExecutionThreads;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2Test.java
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2Test.java
new file mode 100644
index 00000000000..18cba523a35
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2Test.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.plan.maker;
+
+import java.util.Map;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.apache.pinot.spi.utils.CommonConstants.Server;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Tests for execution-thread resolution in {@link InstancePlanMakerImplV2},
covering the interplay
+ * between {@code max.execution.threads}, {@code default.execution.threads},
and per-query overrides.
+ */
+public class InstancePlanMakerImplV2Test {
+
+ private static final String BASE_QUERY = "SELECT * FROM testTable";
+
+ private static QueryContext buildQueryContext(String
maxExecutionThreadsOption) {
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext(BASE_QUERY);
+ if (maxExecutionThreadsOption != null) {
+ Map<String, String> queryOptions = queryContext.getQueryOptions();
+ queryOptions.put(QueryOptionKey.MAX_EXECUTION_THREADS,
maxExecutionThreadsOption);
+ }
+ return queryContext;
+ }
+
+ @Test
+ public void testDefaultNotSetFallsBackToMax() {
+ InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+ planMaker.setMaxExecutionThreads(12);
+
+ QueryContext queryContext = buildQueryContext(null);
+ planMaker.applyQueryOptions(queryContext);
+
+ assertEquals(queryContext.getMaxExecutionThreads(), 12);
+ }
+
+ @Test
+ public void testDefaultExecutionThreadsUsedWhenSet() {
+ InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+ planMaker.setMaxExecutionThreads(16);
+ planMaker.setDefaultExecutionThreads(4);
+
+ QueryContext queryContext = buildQueryContext(null);
+ planMaker.applyQueryOptions(queryContext);
+
+ assertEquals(queryContext.getMaxExecutionThreads(), 4);
+ }
+
+ @Test
+ public void testQueryOverrideOverridesDefault() {
+ InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+ planMaker.setMaxExecutionThreads(16);
+ planMaker.setDefaultExecutionThreads(4);
+
+ QueryContext queryContext = buildQueryContext("10");
+ planMaker.applyQueryOptions(queryContext);
+
+ assertEquals(queryContext.getMaxExecutionThreads(), 10);
+ }
+
+ @Test
+ public void testQueryOverrideCappedByMax() {
+ InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+ planMaker.setMaxExecutionThreads(8);
+ planMaker.setDefaultExecutionThreads(4);
+
+ QueryContext queryContext = buildQueryContext("20");
+ planMaker.applyQueryOptions(queryContext);
+
+ assertEquals(queryContext.getMaxExecutionThreads(), 8);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void testInitRejectsDefaultExceedingMax() {
+ InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+ PinotConfiguration config = new PinotConfiguration();
+ config.setProperty(Server.MAX_EXECUTION_THREADS, 4);
+ config.setProperty(Server.DEFAULT_EXECUTION_THREADS, 8);
+ planMaker.init(config);
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 8111f976912..ecd52695a2e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1323,6 +1323,10 @@ public class CommonConstants {
public static final String CONFIG_OF_QUERY_EXECUTOR_MAX_EXECUTION_THREADS =
QUERY_EXECUTOR_CONFIG_PREFIX + "." + MAX_EXECUTION_THREADS;
public static final int DEFAULT_QUERY_EXECUTOR_MAX_EXECUTION_THREADS = -1;
// Use number of CPU cores
+ public static final String DEFAULT_EXECUTION_THREADS =
"default.execution.threads";
+ public static final String
CONFIG_OF_QUERY_EXECUTOR_DEFAULT_EXECUTION_THREADS =
+ QUERY_EXECUTOR_CONFIG_PREFIX + "." + DEFAULT_EXECUTION_THREADS;
+ public static final int DEFAULT_QUERY_EXECUTOR_DEFAULT_EXECUTION_THREADS =
-1; // Not set; fall back to max
// OOM protection: heap usage throttle configuration
public static final String CONFIG_OF_HEAP_USAGE_THROTTLE_QUEUE_MAX_SIZE =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]