This is an automated email from the ASF dual-hosted git repository.
capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7bbfa039403 feat: Make segmentLoadAheadCount able to be configured at
worker task level in addition to context (#19559)
7bbfa039403 is described below
commit 7bbfa039403196aa45783b1601ca631b3c3cf4cb
Author: Lucas Capistrant <[email protected]>
AuthorDate: Mon Jun 8 18:16:08 2026 -0500
feat: Make segmentLoadAheadCount able to be configured at worker task level
in addition to context (#19559)
* Make segmentLoadAheadCount able to be configured at worker task level in
addition to context
* fixups based on review
---
docs/querying/dart.md | 1 +
.../druid/msq/dart/guice/DartWorkerConfig.java | 24 +++++++
.../druid/msq/dart/worker/DartWorkerContext.java | 39 +++++++++++
.../dart/worker/DartWorkerContextFactoryImpl.java | 5 ++
.../apache/druid/msq/exec/ExecutionContext.java | 7 ++
.../druid/msq/exec/ExecutionContextImpl.java | 9 +++
.../org/apache/druid/msq/exec/RunWorkOrder.java | 1 +
.../org/apache/druid/msq/exec/WorkerContext.java | 14 ++++
.../druid/msq/querykit/BaseLeafStageProcessor.java | 5 +-
.../druid/msq/util/MultiStageQueryContext.java | 5 +-
.../DartWorkerContextSegmentLoadAheadTest.java | 79 ++++++++++++++++++++++
11 files changed, 184 insertions(+), 5 deletions(-)
diff --git a/docs/querying/dart.md b/docs/querying/dart.md
index b68158b5c3e..772983dd908 100644
--- a/docs/querying/dart.md
+++ b/docs/querying/dart.md
@@ -73,6 +73,7 @@ For Historicals, you can set the following configs:
|---|---|---|
| `druid.msq.dart.worker.concurrentQueries` | Maximum number of query workers
that can run concurrently on a Historical. We recommend leaving this config at
the default value. If need to change this value, set it to a value equal to or
larger than `druid.msq.dart.controller.concurrentQueries` on your Brokers. If
you don't, queries can get stuck waiting for each other. Don't set it to a
value higher than the number of merge buffers. | Equal to the number of merge
buffers |
| `druid.msq.dart.worker.heapFraction` | Maximum amount of heap available for
use across all Dart queries as a decimal. | 0.35 (35% of heap) |
+| `druid.msq.dart.worker.segmentLoadAheadCount` | Number of segments a worker
will prefetch ahead of processing them. This lets tiers with different hardware
tune prefetch independently. It acts as a worker-local default: a value
supplied in the query context (`segmentLoadAheadCount`) always takes precedence
when it is set. Non-positive values are treated as unset. | Unset (uses twice
the number of processing threads) |
## Run a Dart query
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerConfig.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerConfig.java
index f7322a1af92..4953972256d 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerConfig.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerConfig.java
@@ -21,6 +21,9 @@ package org.apache.druid.msq.dart.guice;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.msq.exec.MemoryIntrospector;
+import org.apache.druid.msq.querykit.ReadableInputQueue;
+
+import javax.annotation.Nullable;
/**
* Runtime configuration for workers (which run on Historicals).
@@ -41,6 +44,21 @@ public class DartWorkerConfig
@JsonProperty("heapFraction")
private double heapFraction = DEFAULT_HEAP_FRACTION;
+ /**
+ * Worker-local value for the segment load-ahead count used to size segment
prefetch in {@link ReadableInputQueue}.
+ * <p>
+ * Defaults to null (unset), which leaves the value to query context (client
or controller-default supplied) and the
+ * built-in {@code 2 * threadCount} fallback. When set to a positive value,
it acts as a worker-local default used
+ * only when the query context does not supply a value.
+ * <p>
+ * This is per-worker-process configuration, set independently on each
worker. It lets workers with different
+ * hardware (for example, separate tiers with more or less memory and
storage bandwidth) tune their own prefetch
+ * depth, rather than relying solely on a single cluster-wide query-context
default.
+ */
+ @JsonProperty("segmentLoadAheadCount")
+ @Nullable
+ private Integer segmentLoadAheadCount = null;
+
public int getConcurrentQueries()
{
return concurrentQueries;
@@ -50,4 +68,10 @@ public class DartWorkerConfig
{
return heapFraction;
}
+
+ @Nullable
+ public Integer getSegmentLoadAheadCount()
+ {
+ return segmentLoadAheadCount;
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
index e43d79d29f7..b80abcdeb24 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
@@ -30,6 +30,7 @@ import
org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.messages.server.Outbox;
import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
import org.apache.druid.msq.dart.controller.messages.PostCounters;
+import org.apache.druid.msq.dart.guice.DartWorkerConfig;
import org.apache.druid.msq.exec.ControllerClient;
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.exec.FrameContext;
@@ -57,6 +58,7 @@ import org.apache.druid.server.SegmentManager;
import org.apache.druid.utils.CloseableUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import javax.annotation.Nullable;
import java.io.File;
import java.util.List;
@@ -92,6 +94,13 @@ public class DartWorkerContext implements WorkerContext
private final ServiceEmitter emitter;
private final int threadCount;
+ /**
+ * Worker-local segment load-ahead count from {@link
DartWorkerConfig#getSegmentLoadAheadCount()}, or null if unset.
+ * Used as the default in {@link #segmentLoadAheadCount(WorkOrder)} when the
query context does not supply a value.
+ */
+ @Nullable
+ private final Integer segmentLoadAheadCountConfig;
+
/**
* Lazy initialized upon call to {@link #frameContext(WorkOrder)}.
*/
@@ -109,6 +118,7 @@ public class DartWorkerContext implements WorkerContext
final Injector injector,
final DartWorkerClient workerClient,
final DruidProcessingConfig processingConfig,
+ final DartWorkerConfig workerConfig,
final SegmentWrangler segmentWrangler,
final SegmentManager segmentManager,
final VirtualStorageManager virtualStorageManager,
@@ -148,6 +158,9 @@ public class DartWorkerContext implements WorkerContext
final int baseThreadCount = processingConfig.getNumThreads();
final Integer maxThreads =
MultiStageQueryContext.getMaxThreads(queryContext);
this.threadCount = (maxThreads != null && maxThreads > 0) ?
Math.min(baseThreadCount, maxThreads) : baseThreadCount;
+
+ // Worker-local segment load-ahead config from this worker's
DartWorkerConfig.
+ this.segmentLoadAheadCountConfig = workerConfig.getSegmentLoadAheadCount();
}
@Override
@@ -277,6 +290,32 @@ public class DartWorkerContext implements WorkerContext
return threadCount;
}
+ @Override
+ public int segmentLoadAheadCount(final WorkOrder workOrder)
+ {
+ final Integer fromContext =
MultiStageQueryContext.getSegmentLoadAheadCount(workOrder.getWorkerContext());
+ return resolveSegmentLoadAheadCount(fromContext,
segmentLoadAheadCountConfig, threadCount);
+ }
+
+ /**
+ * Determine which of the three potential sources of segment load ahead
count to use.
+ * <p>
+ * Precedence is: a value supplied in the query context wins when set;
otherwise the worker-local config is used
+ * when set to a positive value; lastly we fall back to {@code 2 *
threadCount}.
+ */
+ static int resolveSegmentLoadAheadCount(
+ @Nullable final Integer fromContext,
+ @Nullable final Integer workerConfig,
+ final int threadCount
+ )
+ {
+ if (fromContext != null) {
+ return fromContext;
+ }
+ final boolean hasWorkerConfig = workerConfig != null && workerConfig > 0;
+ return hasWorkerConfig ? workerConfig : threadCount * 2;
+ }
+
@Override
public boolean includeAllCounters()
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
index 16e8e187b98..d0fb2a8be42 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
@@ -31,6 +31,7 @@ import
org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.messages.server.Outbox;
import org.apache.druid.msq.dart.Dart;
import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
+import org.apache.druid.msq.dart.guice.DartWorkerConfig;
import org.apache.druid.msq.exec.MemoryIntrospector;
import org.apache.druid.msq.exec.ProcessingBuffersProvider;
import org.apache.druid.msq.exec.WorkerContext;
@@ -60,6 +61,7 @@ public class DartWorkerContextFactoryImpl implements
DartWorkerContextFactory
private final Injector injector;
private final ServiceClientFactory serviceClientFactory;
private final DruidProcessingConfig processingConfig;
+ private final DartWorkerConfig workerConfig;
private final SegmentWrangler segmentWrangler;
private final SegmentManager segmentManager;
private final VirtualStorageManager virtualStorageManager;
@@ -80,6 +82,7 @@ public class DartWorkerContextFactoryImpl implements
DartWorkerContextFactory
Injector injector,
@EscalatedGlobal ServiceClientFactory serviceClientFactory,
DruidProcessingConfig processingConfig,
+ DartWorkerConfig workerConfig,
SegmentWrangler segmentWrangler,
SegmentManager segmentManager,
VirtualStorageManager virtualStorageManager,
@@ -99,6 +102,7 @@ public class DartWorkerContextFactoryImpl implements
DartWorkerContextFactory
this.injector = injector;
this.serviceClientFactory = serviceClientFactory;
this.processingConfig = processingConfig;
+ this.workerConfig = workerConfig;
this.segmentWrangler = segmentWrangler;
this.coordinatorClient = coordinatorClient;
this.segmentManager = segmentManager;
@@ -128,6 +132,7 @@ public class DartWorkerContextFactoryImpl implements
DartWorkerContextFactory
injector,
createWorkerClient(queryId),
processingConfig,
+ workerConfig,
segmentWrangler,
segmentManager,
virtualStorageManager,
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContext.java
index 217513cad73..c86ddec3149 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContext.java
@@ -91,6 +91,13 @@ public interface ExecutionContext
*/
int threadCount();
+ /**
+ * Effective segment load-ahead count for {@link #workOrder()}, resolved by
+ * {@link WorkerContext#segmentLoadAheadCount(WorkOrder)}. Used to size the
segment prefetch in
+ * {@link org.apache.druid.msq.querykit.ReadableInputQueue}.
+ */
+ int segmentLoadAheadCount();
+
/**
* Cancellation ID that must be provided to {@link FrameProcessorExecutor}
when running work.
*/
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContextImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContextImpl.java
index 7560494754c..9dd2db79783 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContextImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExecutionContextImpl.java
@@ -50,6 +50,7 @@ public class ExecutionContextImpl implements ExecutionContext
private final FrameContext frameContext;
private final CounterTracker counters;
private final int maxOutstandingProcessors;
+ private final int segmentLoadAheadCount;
private final String cancellationId;
private final RunWorkOrderListener listener;
private final Set<String> intermediateOutputChannelFactoryNames =
Sets.newConcurrentHashSet();
@@ -65,6 +66,7 @@ public class ExecutionContextImpl implements ExecutionContext
final FrameContext frameContext,
final CounterTracker counters,
final int maxOutstandingProcessors,
+ final int segmentLoadAheadCount,
final String cancellationId,
final RunWorkOrderListener listener
)
@@ -79,6 +81,7 @@ public class ExecutionContextImpl implements ExecutionContext
this.frameContext = frameContext;
this.counters = counters;
this.maxOutstandingProcessors = maxOutstandingProcessors;
+ this.segmentLoadAheadCount = segmentLoadAheadCount;
this.cancellationId = cancellationId;
this.listener = listener;
}
@@ -147,6 +150,12 @@ public class ExecutionContextImpl implements
ExecutionContext
return maxOutstandingProcessors;
}
+ @Override
+ public int segmentLoadAheadCount()
+ {
+ return segmentLoadAheadCount;
+ }
+
@Override
public String cancellationId()
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
index e064d78640e..1fc54d6f6ba 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
@@ -392,6 +392,7 @@ public class RunWorkOrder
frameContext,
counterTracker,
workerContext.threadCount(),
+ workerContext.segmentLoadAheadCount(workOrder),
cancellationId,
listener
);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
index b225112236d..f8256464d9b 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
@@ -109,6 +109,20 @@ public interface WorkerContext extends Closeable
*/
int threadCount();
+ /**
+ * Effective number of segments to load ahead of when they are needed while
processing the given {@code workOrder},
+ * used to size the segment prefetch in {@link
org.apache.druid.msq.querykit.ReadableInputQueue}.
+ *
+ * The default honors {@link
MultiStageQueryContext#CTX_SEGMENT_LOAD_AHEAD_COUNT} from the work order's
context
+ * (set by the controller from client and broker-default context), and
otherwise falls back to
+ * {@code 2 * threadCount()}. Implementations may override to layer in
worker-local configuration.
+ */
+ default int segmentLoadAheadCount(WorkOrder workOrder)
+ {
+ final Integer fromContext =
MultiStageQueryContext.getSegmentLoadAheadCount(workOrder.getWorkerContext());
+ return fromContext != null ? fromContext : threadCount() * 2;
+ }
+
/**
* Fetch node info about self.
*/
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
index ec3884554e1..f655010b4f1 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java
@@ -51,7 +51,6 @@ import org.apache.druid.msq.input.external.ExternalInputSlice;
import org.apache.druid.msq.input.stage.StageInputSlice;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.kernel.StageDefinition;
-import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.Query;
import org.apache.druid.query.filter.SegmentPruner;
import org.apache.druid.query.planning.ExecutionVertex;
@@ -294,12 +293,10 @@ public abstract class BaseLeafStageProcessor extends
BasicStageProcessor
}
final List<PhysicalInputSlice> filteredSlices =
filterBaseInput(physicalInputSlices);
- final Integer segmentLoadAheadCount =
-
MultiStageQueryContext.getSegmentLoadAheadCount(context.workOrder().getWorkerContext());
return new ReadableInputQueue(
new StandardPartitionReader(context),
filteredSlices,
- segmentLoadAheadCount != null ? segmentLoadAheadCount :
context.threadCount() * 2
+ context.segmentLoadAheadCount()
);
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index d731ca074ae..114e70a7132 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -278,7 +278,10 @@ public class MultiStageQueryContext
public static final String CTX_MAX_THREADS = "maxThreads";
/**
- * Maximum number of segments to load ahead of them being needed. Used when
setting up {@link ReadableInputQueue}.
+ * Number of segments to load ahead of them being needed. Used when setting
up {@link ReadableInputQueue}.
+ * <p>
+ * A worker may be configured with a local default for this value. When this
context value is set, it always wins;
+ * the worker-local default applies only when this context value is absent.
*/
public static final String CTX_SEGMENT_LOAD_AHEAD_COUNT =
"segmentLoadAheadCount";
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerContextSegmentLoadAheadTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerContextSegmentLoadAheadTest.java
new file mode 100644
index 00000000000..29e200d6a35
--- /dev/null
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerContextSegmentLoadAheadTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.worker;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DartWorkerContextSegmentLoadAheadTest
+{
+ private static final int THREAD_COUNT = 8;
+
+ @Test
+ public void test_noWorkerConfig_noContext_fallsBackToTwiceThreadCount()
+ {
+ assertEquals(16, DartWorkerContext.resolveSegmentLoadAheadCount(null,
null, THREAD_COUNT));
+ }
+
+ @Test
+ public void test_noWorkerConfig_withContext_usesContext()
+ {
+ assertEquals(40, DartWorkerContext.resolveSegmentLoadAheadCount(40, null,
THREAD_COUNT));
+ }
+
+ @Test
+ public void test_workerConfig_noContext_usesWorkerConfig()
+ {
+ assertEquals(128, DartWorkerContext.resolveSegmentLoadAheadCount(null,
128, THREAD_COUNT));
+ }
+
+ @Test
+ public void test_workerConfig_contextLower_contextWins()
+ {
+ assertEquals(64, DartWorkerContext.resolveSegmentLoadAheadCount(64, 128,
THREAD_COUNT));
+ }
+
+ @Test
+ public void test_workerConfig_contextHigher_contextWins()
+ {
+ assertEquals(256, DartWorkerContext.resolveSegmentLoadAheadCount(256, 128,
THREAD_COUNT));
+ }
+
+ @Test
+ public void test_workerConfig_contextEqual_returnsThatValue()
+ {
+ assertEquals(128, DartWorkerContext.resolveSegmentLoadAheadCount(128, 128,
THREAD_COUNT));
+ }
+
+ @Test
+ public void test_nonPositiveWorkerConfig_treatedAsUnset_withContext()
+ {
+ assertEquals(40, DartWorkerContext.resolveSegmentLoadAheadCount(40, 0,
THREAD_COUNT));
+ assertEquals(40, DartWorkerContext.resolveSegmentLoadAheadCount(40, -5,
THREAD_COUNT));
+ }
+
+ @Test
+ public void test_nonPositiveWorkerConfig_treatedAsUnset_noContext()
+ {
+ assertEquals(16, DartWorkerContext.resolveSegmentLoadAheadCount(null, 0,
THREAD_COUNT));
+ assertEquals(16, DartWorkerContext.resolveSegmentLoadAheadCount(null, -5,
THREAD_COUNT));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]