This is an automated email from the ASF dual-hosted git repository.

shauryachats pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 47c149397b1 Add default.execution.threads server config to decouple 
default query parallelism from max (#18622)
47c149397b1 is described below

commit 47c149397b12d555ed83a001845edd38b26c6f5e
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Wed Jun 3 12:28:07 2026 -0700

    Add default.execution.threads server config to decouple default query 
parallelism from max (#18622)
    
    Currently, `max.execution.threads` serves as both the hard cap and the
    implicit default for per-query execution threads. When operators tune it
    up, all queries immediately use the higher value.
    
    This introduces `pinot.server.query.executor.default.execution.threads`
    so that operators can set a lower default for normal queries while still
    allowing selected queries to reach the max via `SET maxExecutionThreads`.
    
    Resolution order:
      1. Per-query override (SET maxExecutionThreads=N) — capped by max
      2. Server default (default.execution.threads) — capped by max
      3. Server max (max.execution.threads) — legacy fallback
    
    When not configured, behavior is unchanged.
    
    Co-authored-by: Cursor <[email protected]>
---
 .../core/plan/maker/InstancePlanMakerImplV2.java   |  43 +++++++--
 .../plan/maker/InstancePlanMakerImplV2Test.java    | 104 +++++++++++++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |   4 +
 3 files changed, 145 insertions(+), 6 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 371ee1874b9..2980ff7ee2a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -105,6 +105,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
 
   private final FetchPlanner _fetchPlanner = FetchPlannerRegistry.getPlanner();
   private int _maxExecutionThreads = 
Server.DEFAULT_QUERY_EXECUTOR_MAX_EXECUTION_THREADS;
+  private int _defaultExecutionThreads = 
Server.DEFAULT_QUERY_EXECUTOR_DEFAULT_EXECUTION_THREADS;
   private int _maxInitialResultHolderCapacity = 
Server.DEFAULT_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
   private int _minInitialIndexedTableCapacity = 
Server.DEFAULT_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY;
   // Limit on number of groups stored for each segment, beyond which no new 
group will be created
@@ -120,6 +121,9 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   public void init(PinotConfiguration queryExecutorConfig) {
     _maxExecutionThreads = 
queryExecutorConfig.getProperty(Server.MAX_EXECUTION_THREADS,
         Server.DEFAULT_QUERY_EXECUTOR_MAX_EXECUTION_THREADS);
+    _defaultExecutionThreads = 
queryExecutorConfig.getProperty(Server.DEFAULT_EXECUTION_THREADS,
+        Server.DEFAULT_QUERY_EXECUTOR_DEFAULT_EXECUTION_THREADS);
+    validateExecutionThreadConfig();
     _maxInitialResultHolderCapacity = 
queryExecutorConfig.getProperty(Server.MAX_INITIAL_RESULT_HOLDER_CAPACITY,
         Server.DEFAULT_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
     _minInitialIndexedTableCapacity = 
queryExecutorConfig.getProperty(Server.MIN_INITIAL_INDEXED_TABLE_CAPACITY,
@@ -142,10 +146,31 @@ public class InstancePlanMakerImplV2 implements PlanMaker 
{
         Server.DEFAULT_QUERY_EXECUTOR_GROUPBY_TRIM_THRESHOLD);
     Preconditions.checkState(_groupByTrimThreshold > 0,
         "Invalid configurable: groupByTrimThreshold: %d must be positive", 
_groupByTrimThreshold);
-    LOGGER.info("Initialized plan maker with maxExecutionThreads: {}, 
maxInitialResultHolderCapacity: {}, "
-            + "numGroupsLimit: {}, minSegmentGroupTrimSize: {}, 
minServerGroupTrimSize: {}, groupByTrimThreshold: {}",
-        _maxExecutionThreads, _maxInitialResultHolderCapacity, 
_numGroupsLimit, _minSegmentGroupTrimSize,
-        _minServerGroupTrimSize, _groupByTrimThreshold);
+    LOGGER.info("Initialized plan maker with maxExecutionThreads: {}, 
defaultExecutionThreads: {}, "
+            + "maxInitialResultHolderCapacity: {}, numGroupsLimit: {}, 
minSegmentGroupTrimSize: {}, "
+            + "minServerGroupTrimSize: {}, groupByTrimThreshold: {}",
+        _maxExecutionThreads, _defaultExecutionThreads, 
_maxInitialResultHolderCapacity, _numGroupsLimit,
+        _minSegmentGroupTrimSize, _minServerGroupTrimSize, 
_groupByTrimThreshold);
+  }
+
+  @VisibleForTesting
+  public void setMaxExecutionThreads(int maxExecutionThreads) {
+    _maxExecutionThreads = maxExecutionThreads;
+    validateExecutionThreadConfig();
+  }
+
+  @VisibleForTesting
+  public void setDefaultExecutionThreads(int defaultExecutionThreads) {
+    _defaultExecutionThreads = defaultExecutionThreads;
+    validateExecutionThreadConfig();
+  }
+
+  private void validateExecutionThreadConfig() {
+    if (_defaultExecutionThreads > 0 && _maxExecutionThreads > 0) {
+      Preconditions.checkState(_defaultExecutionThreads <= 
_maxExecutionThreads,
+          "Invalid configuration: defaultExecutionThreads: %d must be <= 
maxExecutionThreads: %d",
+          _defaultExecutionThreads, _maxExecutionThreads);
+    }
   }
 
   @VisibleForTesting
@@ -208,7 +233,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
         new InstanceResponsePlanNode(combinePlanNode, segmentContexts, 
fetchContexts, queryContext));
   }
 
-  private void applyQueryOptions(QueryContext queryContext) {
+  @VisibleForTesting
+  void applyQueryOptions(QueryContext queryContext) {
     Map<String, String> queryOptions = queryContext.getQueryOptions();
 
     // Set skipUpsert
@@ -227,15 +253,20 @@ public class InstancePlanMakerImplV2 implements PlanMaker 
{
     
queryContext.setSkipIndexes(QueryOptionsUtils.getSkipIndexes(queryOptions));
 
     // Set maxExecutionThreads
+    // Resolution order:
+    //   1. Per-query override (SET maxExecutionThreads=N) — capped by server 
max
+    //   2. Server-level default (default.execution.threads) — decoupled from 
max, but still capped by it
+    //   3. Server-level max (max.execution.threads) — legacy fallback
     int maxExecutionThreads;
     Integer maxExecutionThreadsFromQuery = 
QueryOptionsUtils.getMaxExecutionThreads(queryOptions);
     if (maxExecutionThreadsFromQuery != null) {
-      // Do not allow query to override the execution threads over the 
instance-level limit
       if (_maxExecutionThreads > 0) {
         maxExecutionThreads = Math.min(_maxExecutionThreads, 
maxExecutionThreadsFromQuery);
       } else {
         maxExecutionThreads = maxExecutionThreadsFromQuery;
       }
+    } else if (_defaultExecutionThreads > 0) {
+      maxExecutionThreads = _defaultExecutionThreads;
     } else {
       maxExecutionThreads = _maxExecutionThreads;
     }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2Test.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2Test.java
new file mode 100644
index 00000000000..18cba523a35
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2Test.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.plan.maker;
+
+import java.util.Map;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.apache.pinot.spi.utils.CommonConstants.Server;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Tests for execution-thread resolution in {@link InstancePlanMakerImplV2}, 
covering the interplay
+ * between {@code max.execution.threads}, {@code default.execution.threads}, 
and per-query overrides.
+ */
+public class InstancePlanMakerImplV2Test {
+
+  private static final String BASE_QUERY = "SELECT * FROM testTable";
+
+  private static QueryContext buildQueryContext(String 
maxExecutionThreadsOption) {
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext(BASE_QUERY);
+    if (maxExecutionThreadsOption != null) {
+      Map<String, String> queryOptions = queryContext.getQueryOptions();
+      queryOptions.put(QueryOptionKey.MAX_EXECUTION_THREADS, 
maxExecutionThreadsOption);
+    }
+    return queryContext;
+  }
+
+  @Test
+  public void testDefaultNotSetFallsBackToMax() {
+    InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+    planMaker.setMaxExecutionThreads(12);
+
+    QueryContext queryContext = buildQueryContext(null);
+    planMaker.applyQueryOptions(queryContext);
+
+    assertEquals(queryContext.getMaxExecutionThreads(), 12);
+  }
+
+  @Test
+  public void testDefaultExecutionThreadsUsedWhenSet() {
+    InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+    planMaker.setMaxExecutionThreads(16);
+    planMaker.setDefaultExecutionThreads(4);
+
+    QueryContext queryContext = buildQueryContext(null);
+    planMaker.applyQueryOptions(queryContext);
+
+    assertEquals(queryContext.getMaxExecutionThreads(), 4);
+  }
+
+  @Test
+  public void testQueryOverrideOverridesDefault() {
+    InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+    planMaker.setMaxExecutionThreads(16);
+    planMaker.setDefaultExecutionThreads(4);
+
+    QueryContext queryContext = buildQueryContext("10");
+    planMaker.applyQueryOptions(queryContext);
+
+    assertEquals(queryContext.getMaxExecutionThreads(), 10);
+  }
+
+  @Test
+  public void testQueryOverrideCappedByMax() {
+    InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+    planMaker.setMaxExecutionThreads(8);
+    planMaker.setDefaultExecutionThreads(4);
+
+    QueryContext queryContext = buildQueryContext("20");
+    planMaker.applyQueryOptions(queryContext);
+
+    assertEquals(queryContext.getMaxExecutionThreads(), 8);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class)
+  public void testInitRejectsDefaultExceedingMax() {
+    InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2();
+    PinotConfiguration config = new PinotConfiguration();
+    config.setProperty(Server.MAX_EXECUTION_THREADS, 4);
+    config.setProperty(Server.DEFAULT_EXECUTION_THREADS, 8);
+    planMaker.init(config);
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 8111f976912..ecd52695a2e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1323,6 +1323,10 @@ public class CommonConstants {
     public static final String CONFIG_OF_QUERY_EXECUTOR_MAX_EXECUTION_THREADS =
         QUERY_EXECUTOR_CONFIG_PREFIX + "." + MAX_EXECUTION_THREADS;
     public static final int DEFAULT_QUERY_EXECUTOR_MAX_EXECUTION_THREADS = -1; 
 // Use number of CPU cores
+    public static final String DEFAULT_EXECUTION_THREADS = 
"default.execution.threads";
+    public static final String 
CONFIG_OF_QUERY_EXECUTOR_DEFAULT_EXECUTION_THREADS =
+        QUERY_EXECUTOR_CONFIG_PREFIX + "." + DEFAULT_EXECUTION_THREADS;
+    public static final int DEFAULT_QUERY_EXECUTOR_DEFAULT_EXECUTION_THREADS = 
-1;  // Not set; fall back to max
 
     // OOM protection: heap usage throttle configuration
     public static final String CONFIG_OF_HEAP_USAGE_THROTTLE_QUEUE_MAX_SIZE =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to