This is an automated email from the ASF dual-hosted git repository.

capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bbfa039403 feat: Make segmentLoadAheadCount able to be configured at 
worker task level in addition to context (#19559)
7bbfa039403 is described below

commit 7bbfa039403196aa45783b1601ca631b3c3cf4cb
Author: Lucas Capistrant <[email protected]>
AuthorDate: Mon Jun 8 18:16:08 2026 -0500

    feat: Make segmentLoadAheadCount able to be configured at worker task level 
in addition to context (#19559)
    
    * Make segmentLoadAheadCount able to be configured at worker task level in 
addition to context
    
    * fixups based on review
---
 docs/querying/dart.md                              |  1 +
 .../druid/msq/dart/guice/DartWorkerConfig.java     | 24 +++++++
 .../druid/msq/dart/worker/DartWorkerContext.java   | 39 +++++++++++
 .../dart/worker/DartWorkerContextFactoryImpl.java  |  5 ++
 .../apache/druid/msq/exec/ExecutionContext.java    |  7 ++
 .../druid/msq/exec/ExecutionContextImpl.java       |  9 +++
 .../org/apache/druid/msq/exec/RunWorkOrder.java    |  1 +
 .../org/apache/druid/msq/exec/WorkerContext.java   | 14 ++++
 .../druid/msq/querykit/BaseLeafStageProcessor.java |  5 +-
 .../druid/msq/util/MultiStageQueryContext.java     |  5 +-
 .../DartWorkerContextSegmentLoadAheadTest.java     | 79 ++++++++++++++++++++++
 11 files changed, 184 insertions(+), 5 deletions(-)

diff --git a/docs/querying/dart.md b/docs/querying/dart.md
index b68158b5c3e..772983dd908 100644
--- a/docs/querying/dart.md
+++ b/docs/querying/dart.md
@@ -73,6 +73,7 @@ For Historicals, you can set the following configs:
 |---|---|---|
 | `druid.msq.dart.worker.concurrentQueries` | Maximum number of query workers 
that can run concurrently on a Historical. We recommend leaving this config at 
the default value. If need to change this value, set it to a value equal to or 
larger than `druid.msq.dart.controller.concurrentQueries` on your Brokers. If 
you don't, queries can get stuck waiting for each other. Don't set it to a 
value higher than the number of merge buffers. | Equal to the number of merge 
buffers |
 | `druid.msq.dart.worker.heapFraction` | Maximum amount of heap available for 
use across all Dart queries as a decimal. | 0.35 (35% of heap) |
+| `druid.msq.dart.worker.segmentLoadAheadCount` | Number of segments a worker 
will prefetch ahead of processing them. This lets tiers with different hardware 
tune prefetch independently. It acts as a worker-local default: a value 
supplied in the query context (`segmentLoadAheadCount`) always takes precedence 
when it is set. Non-positive values are treated as unset. | Unset (uses twice 
the number of processing threads) |
 
 
 ## Run a Dart query
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerConfig.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerConfig.java
index f7322a1af92..4953972256d 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerConfig.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerConfig.java
@@ -21,6 +21,9 @@ package org.apache.druid.msq.dart.guice;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.msq.exec.MemoryIntrospector;
+import org.apache.druid.msq.querykit.ReadableInputQueue;
+
+import javax.annotation.Nullable;
 
 /**
  * Runtime configuration for workers (which run on Historicals).
@@ -41,6 +44,21 @@ public class DartWorkerConfig
   @JsonProperty("heapFraction")
   private double heapFraction = DEFAULT_HEAP_FRACTION;
 
+  /**
+   * Worker-local value for the segment load-ahead count used to size segment 
prefetch in {@link ReadableInputQueue}.
+   * <p>
+   * Defaults to null (unset), which leaves the value to query context (client 
or controller-default supplied) and the
+   * built-in {@code 2 * threadCount} fallback. When set to a positive value, 
it acts as a worker-local default used
+   * only when the query context does not supply a value.
+   * <p>
+   * This is per-worker-process configuration, set independently on each 
worker. It lets workers with different
+   * hardware (for example, separate tiers with more or less memory and 
storage bandwidth) tune their own prefetch
+   * depth, rather than relying solely on a single cluster-wide query-context 
default.
+   */
+  @JsonProperty("segmentLoadAheadCount")
+  @Nullable
+  private Integer segmentLoadAheadCount = null;
+
   public int getConcurrentQueries()
   {
     return concurrentQueries;
@@ -50,4 +68,10 @@ public class DartWorkerConfig
   {
     return heapFraction;
   }
+
+  @Nullable
+  public Integer getSegmentLoadAheadCount()
+  {
+    return segmentLoadAheadCount;
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
index e43d79d29f7..b80abcdeb24 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
@@ -30,6 +30,7 @@ import 
org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.messages.server.Outbox;
 import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
 import org.apache.druid.msq.dart.controller.messages.PostCounters;
+import org.apache.druid.msq.dart.guice.DartWorkerConfig;
 import org.apache.druid.msq.exec.ControllerClient;
 import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
 import org.apache.druid.msq.exec.FrameContext;
@@ -57,6 +58,7 @@ import org.apache.druid.server.SegmentManager;
 import org.apache.druid.utils.CloseableUtils;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.util.List;
 
@@ -92,6 +94,13 @@ public class DartWorkerContext implements WorkerContext
   private final ServiceEmitter emitter;
   private final int threadCount;
 
+  /**
+   * Worker-local segment load-ahead count from {@link 
DartWorkerConfig#getSegmentLoadAheadCount()}, or null if unset.
+   * Used as the default in {@link #segmentLoadAheadCount(WorkOrder)} when the 
query context does not supply a value.
+   */
+  @Nullable
+  private final Integer segmentLoadAheadCountConfig;
+
   /**
    * Lazy initialized upon call to {@link #frameContext(WorkOrder)}.
    */
@@ -109,6 +118,7 @@ public class DartWorkerContext implements WorkerContext
       final Injector injector,
       final DartWorkerClient workerClient,
       final DruidProcessingConfig processingConfig,
+      final DartWorkerConfig workerConfig,
       final SegmentWrangler segmentWrangler,
       final SegmentManager segmentManager,
       final VirtualStorageManager virtualStorageManager,
@@ -148,6 +158,9 @@ public class DartWorkerContext implements WorkerContext
     final int baseThreadCount = processingConfig.getNumThreads();
     final Integer maxThreads = 
MultiStageQueryContext.getMaxThreads(queryContext);
     this.threadCount = (maxThreads != null && maxThreads > 0) ? 
Math.min(baseThreadCount, maxThreads) : baseThreadCount;
+
+    // Worker-local segment load-ahead config from this worker's 
DartWorkerConfig.
+    this.segmentLoadAheadCountConfig = workerConfig.getSegmentLoadAheadCount();
   }
 
   @Override
@@ -277,6 +290,32 @@ public class DartWorkerContext implements WorkerContext
     return threadCount;
   }
 
+  @Override
+  public int segmentLoadAheadCount(final WorkOrder workOrder)
+  {
+    final Integer fromContext = 
MultiStageQueryContext.getSegmentLoadAheadCount(workOrder.getWorkerContext());
+    return resolveSegmentLoadAheadCount(fromContext, 
segmentLoadAheadCountConfig, threadCount);
+  }
+
+  /**
+   * Determine which of the three potential sources of segment load ahead 
count to use.
+   * <p>
+   * Precedence is: a value supplied in the query context wins when set; 
otherwise the worker-local config is used
+   * when set to a positive value; lastly we fall back to {@code 2 * 
threadCount}.
+   */
+  static int resolveSegmentLoadAheadCount(
+      @Nullable final Integer fromContext,
+      @Nullable final Integer workerConfig,
+      final int threadCount
+  )
+  {
+    if (fromContext != null) {
+      return fromContext;
+    }
+    final boolean hasWorkerConfig = workerConfig != null && workerConfig > 0;
+    return hasWorkerConfig ? workerConfig : threadCount * 2;
+  }
+
   @Override
   public boolean includeAllCounters()
   {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
index 16e8e187b98..d0fb2a8be42 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
@@ -31,6 +31,7 @@ import 
org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.messages.server.Outbox;
 import org.apache.druid.msq.dart.Dart;
 import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
+import org.apache.druid.msq.dart.guice.DartWorkerConfig;
 import org.apache.druid.msq.exec.MemoryIntrospector;
 import org.apache.druid.msq.exec.ProcessingBuffersProvider;
 import org.apache.druid.msq.exec.WorkerContext;
@@ -60,6 +61,7 @@ public class DartWorkerContextFactoryImpl implements 
DartWorkerContextFactory
   private final Injector injector;
   private final ServiceClientFactory serviceClientFactory;
   private final DruidProcessingConfig processingConfig;
+  private final DartWorkerConfig workerConfig;
   private final SegmentWrangler segmentWrangler;
   private final SegmentManager segmentManager;
   private final VirtualStorageManager virtualStorageManager;
@@ -80,6 +82,7 @@ public class DartWorkerContextFactoryImpl implements 
DartWorkerContextFactory
       Injector injector,
       @EscalatedGlobal ServiceClientFactory serviceClientFactory,
       DruidProcessingConfig processingConfig,
+      DartWorkerConfig workerConfig,
       SegmentWrangler segmentWrangler,
       SegmentManager segmentManager,
       VirtualStorageManager virtualStorageManager,
@@ -99,6 +102,7 @@ public class DartWorkerContextFactoryImpl implements 
DartWorkerContextFactory
     this.injector = injector;
     this.serviceClientFactory = serviceClientFactory;
     this.processingConfig = processingConfig;
+    this.workerConfig = workerConfig;
     this.segmentWrangler = segmentWrangler;
     this.coordinatorClient = coordinatorClient;
     this.segmentManager = segmentManager;
@@ -128,6 +132,7 @@ public class DartWorkerContextFactoryImpl implements 
DartWorkerContextFactory
         injector,
         createWorkerClient(queryId),
         processingConfig,
+        workerConfig,
         segmentWrangler,
         segmentManager,
         virtualStorageManager,
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContext.java
index 217513cad73..c86ddec3149 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContext.java
@@ -91,6 +91,13 @@ public interface ExecutionContext
    */
   int threadCount();
 
+  /**
+   * Effective segment load-ahead count for {@link #workOrder()}, resolved by
+   * {@link WorkerContext#segmentLoadAheadCount(WorkOrder)}. Used to size the 
segment prefetch in
+   * {@link org.apache.druid.msq.querykit.ReadableInputQueue}.
+   */
+  int segmentLoadAheadCount();
+
   /**
    * Cancellation ID that must be provided to {@link FrameProcessorExecutor} 
when running work.
    */
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContextImpl.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContextImpl.java
index 7560494754c..9dd2db79783 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContextImpl.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContextImpl.java
@@ -50,6 +50,7 @@ public class ExecutionContextImpl implements ExecutionContext
   private final FrameContext frameContext;
   private final CounterTracker counters;
   private final int maxOutstandingProcessors;
+  private final int segmentLoadAheadCount;
   private final String cancellationId;
   private final RunWorkOrderListener listener;
   private final Set<String> intermediateOutputChannelFactoryNames = 
Sets.newConcurrentHashSet();
@@ -65,6 +66,7 @@ public class ExecutionContextImpl implements ExecutionContext
       final FrameContext frameContext,
       final CounterTracker counters,
       final int maxOutstandingProcessors,
+      final int segmentLoadAheadCount,
       final String cancellationId,
       final RunWorkOrderListener listener
   )
@@ -79,6 +81,7 @@ public class ExecutionContextImpl implements ExecutionContext
     this.frameContext = frameContext;
     this.counters = counters;
     this.maxOutstandingProcessors = maxOutstandingProcessors;
+    this.segmentLoadAheadCount = segmentLoadAheadCount;
     this.cancellationId = cancellationId;
     this.listener = listener;
   }
@@ -147,6 +150,12 @@ public class ExecutionContextImpl implements 
ExecutionContext
     return maxOutstandingProcessors;
   }
 
+  @Override
+  public int segmentLoadAheadCount()
+  {
+    return segmentLoadAheadCount;
+  }
+
   @Override
   public String cancellationId()
   {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
index e064d78640e..1fc54d6f6ba 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
@@ -392,6 +392,7 @@ public class RunWorkOrder
         frameContext,
         counterTracker,
         workerContext.threadCount(),
+        workerContext.segmentLoadAheadCount(workOrder),
         cancellationId,
         listener
     );
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
index b225112236d..f8256464d9b 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
@@ -109,6 +109,20 @@ public interface WorkerContext extends Closeable
    */
   int threadCount();
 
+  /**
+   * Effective number of segments to load ahead of when they are needed while 
processing the given {@code workOrder},
+   * used to size the segment prefetch in {@link 
org.apache.druid.msq.querykit.ReadableInputQueue}.
+   *
+   * The default honors {@link 
MultiStageQueryContext#CTX_SEGMENT_LOAD_AHEAD_COUNT} from the work order's 
context
+   * (set by the controller from client and broker-default context), and 
otherwise falls back to
+   * {@code 2 * threadCount()}. Implementations may override to layer in 
worker-local configuration.
+   */
+  default int segmentLoadAheadCount(WorkOrder workOrder)
+  {
+    final Integer fromContext = 
MultiStageQueryContext.getSegmentLoadAheadCount(workOrder.getWorkerContext());
+    return fromContext != null ? fromContext : threadCount() * 2;
+  }
+
   /**
    * Fetch node info about self.
    */
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
index ec3884554e1..f655010b4f1 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
@@ -51,7 +51,6 @@ import org.apache.druid.msq.input.external.ExternalInputSlice;
 import org.apache.druid.msq.input.stage.StageInputSlice;
 import org.apache.druid.msq.input.table.SegmentsInputSlice;
 import org.apache.druid.msq.kernel.StageDefinition;
-import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.filter.SegmentPruner;
 import org.apache.druid.query.planning.ExecutionVertex;
@@ -294,12 +293,10 @@ public abstract class BaseLeafStageProcessor extends 
BasicStageProcessor
     }
 
     final List<PhysicalInputSlice> filteredSlices = 
filterBaseInput(physicalInputSlices);
-    final Integer segmentLoadAheadCount =
-        
MultiStageQueryContext.getSegmentLoadAheadCount(context.workOrder().getWorkerContext());
     return new ReadableInputQueue(
         new StandardPartitionReader(context),
         filteredSlices,
-        segmentLoadAheadCount != null ? segmentLoadAheadCount : 
context.threadCount() * 2
+        context.segmentLoadAheadCount()
     );
   }
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index d731ca074ae..114e70a7132 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -278,7 +278,10 @@ public class MultiStageQueryContext
   public static final String CTX_MAX_THREADS = "maxThreads";
 
   /**
-   * Maximum number of segments to load ahead of them being needed. Used when 
setting up {@link ReadableInputQueue}.
+   * Number of segments to load ahead of them being needed. Used when setting 
up {@link ReadableInputQueue}.
+   * <p>
+   * A worker may be configured with a local default for this value. When this 
context value is set, it always wins;
+   * the worker-local default applies only when this context value is absent.
    */
   public static final String CTX_SEGMENT_LOAD_AHEAD_COUNT = 
"segmentLoadAheadCount";
 
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerContextSegmentLoadAheadTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerContextSegmentLoadAheadTest.java
new file mode 100644
index 00000000000..29e200d6a35
--- /dev/null
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerContextSegmentLoadAheadTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.worker;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DartWorkerContextSegmentLoadAheadTest
+{
+  private static final int THREAD_COUNT = 8;
+
+  @Test
+  public void test_noWorkerConfig_noContext_fallsBackToTwiceThreadCount()
+  {
+    assertEquals(16, DartWorkerContext.resolveSegmentLoadAheadCount(null, 
null, THREAD_COUNT));
+  }
+
+  @Test
+  public void test_noWorkerConfig_withContext_usesContext()
+  {
+    assertEquals(40, DartWorkerContext.resolveSegmentLoadAheadCount(40, null, 
THREAD_COUNT));
+  }
+
+  @Test
+  public void test_workerConfig_noContext_usesWorkerConfig()
+  {
+    assertEquals(128, DartWorkerContext.resolveSegmentLoadAheadCount(null, 
128, THREAD_COUNT));
+  }
+
+  @Test
+  public void test_workerConfig_contextLower_contextWins()
+  {
+    assertEquals(64, DartWorkerContext.resolveSegmentLoadAheadCount(64, 128, 
THREAD_COUNT));
+  }
+
+  @Test
+  public void test_workerConfig_contextHigher_contextWins()
+  {
+    assertEquals(256, DartWorkerContext.resolveSegmentLoadAheadCount(256, 128, 
THREAD_COUNT));
+  }
+
+  @Test
+  public void test_workerConfig_contextEqual_returnsThatValue()
+  {
+    assertEquals(128, DartWorkerContext.resolveSegmentLoadAheadCount(128, 128, 
THREAD_COUNT));
+  }
+
+  @Test
+  public void test_nonPositiveWorkerConfig_treatedAsUnset_withContext()
+  {
+    assertEquals(40, DartWorkerContext.resolveSegmentLoadAheadCount(40, 0, 
THREAD_COUNT));
+    assertEquals(40, DartWorkerContext.resolveSegmentLoadAheadCount(40, -5, 
THREAD_COUNT));
+  }
+
+  @Test
+  public void test_nonPositiveWorkerConfig_treatedAsUnset_noContext()
+  {
+    assertEquals(16, DartWorkerContext.resolveSegmentLoadAheadCount(null, 0, 
THREAD_COUNT));
+    assertEquals(16, DartWorkerContext.resolveSegmentLoadAheadCount(null, -5, 
THREAD_COUNT));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to