This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0bccfb7f449 fix: getOnlyNonBroadcastInputAsStageId should ignore
non-stage inputs. (#19201)
0bccfb7f449 is described below
commit 0bccfb7f449cd8f953b5cc6c86a17509acba5c3f
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Mar 24 14:28:00 2026 -0700
fix: getOnlyNonBroadcastInputAsStageId should ignore non-stage inputs.
(#19201)
This is a helper in ControllerQueryKernelUtils that helps the controller
determine whether a stage can run in MEMORY output mode. The helper
was erroneously considering all non-broadcast inputs, when really
it should only consider non-broadcast stage-typed inputs.
---
.../controller/ControllerQueryKernelUtils.java | 25 ++++++++------
.../controller/ControllerQueryKernelUtilsTest.java | 40 ++++++++++++++++++++++
.../controller/MockQueryDefinitionBuilder.java | 22 ++++++++++++
3 files changed, 76 insertions(+), 11 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelUtils.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelUtils.java
index d971f33a9f2..e0f84bfa6da 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelUtils.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelUtils.java
@@ -26,7 +26,7 @@ import org.apache.druid.msq.exec.OutputChannelMode;
import org.apache.druid.msq.indexing.destination.MSQDestination;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.input.InputSpec;
-import org.apache.druid.msq.input.InputSpecs;
+import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageId;
@@ -330,7 +330,7 @@ public class ControllerQueryKernelUtils
// can only support a single reader.
// 2) Downstream stages can only have a single input stage with output
mode MEMORY. This isn't strictly
// necessary, but it simplifies the logic around concurrently
launching stages.
- return
stageId.equals(getOnlyNonBroadcastInputAsStageId(outflowStageDef));
+ return
stageId.equals(getOnlyNonBroadcastStageInputAsStageId(outflowStageDef));
} else {
return false;
}
@@ -365,25 +365,28 @@ public class ControllerQueryKernelUtils
* This is a helper used by {@link #canUseMemoryOutput}.
*/
@Nullable
- public static StageId getOnlyNonBroadcastInputAsStageId(final
StageDefinition downstreamStageDef)
+ public static StageId getOnlyNonBroadcastStageInputAsStageId(final
StageDefinition downstreamStageDef)
{
final List<InputSpec> inputSpecs = downstreamStageDef.getInputSpecs();
final IntSet broadcastInputNumbers =
downstreamStageDef.getBroadcastInputNumbers();
- if (inputSpecs.size() - broadcastInputNumbers.size() != 1) {
- return null;
- }
+ StageId found = null;
for (int i = 0; i < inputSpecs.size(); i++) {
- if (!broadcastInputNumbers.contains(i)) {
- final IntSet stageNumbers =
InputSpecs.getStageNumbers(Collections.singletonList(inputSpecs.get(i)));
- if (stageNumbers.size() == 1) {
- return new StageId(downstreamStageDef.getId().getQueryId(),
stageNumbers.iterator().nextInt());
+ if (!broadcastInputNumbers.contains(i) && inputSpecs.get(i) instanceof
StageInputSpec) {
+ if (found != null) {
+ // More than one non-broadcast stage input.
+ return null;
}
+
+ found = new StageId(
+ downstreamStageDef.getId().getQueryId(),
+ ((StageInputSpec) inputSpecs.get(i)).getStageNumber()
+ );
}
}
- return null;
+ return found;
}
/**
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelUtilsTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelUtilsTest.java
index b6bb5bb3d4e..8830ffe63e3 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelUtilsTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelUtilsTest.java
@@ -302,6 +302,34 @@ public class ControllerQueryKernelUtilsTest
);
}
+ @Test
+ public void
test_computeStageGroups_linearWithLeafInput_pipeline_threeAtOnce()
+ {
+ // 0 -> 1 (with leaf input) -> 2
+ // Stage 1 has a stage input from 0 and a non-stage (leaf) input.
+ // canUseMemoryOutput for stage 0 should still return true, because stage
1 has
+ // only a single non-broadcast *stage* input.
+
+ final QueryDefinition queryDef = makeLinearQueryDefinitionWithLeafInput();
+
+ Assert.assertEquals(
+ ImmutableList.of(
+ makeStageGroup(queryDef.getQueryId(), OutputChannelMode.MEMORY, 0,
1, 2)
+ ),
+ ControllerQueryKernelUtils.computeStageGroups(
+ queryDef,
+ ControllerQueryKernelConfig
+ .builder()
+ .maxRetainedPartitionSketchBytes(1)
+ .maxConcurrentStages(3)
+ .pipeline(true)
+ .faultTolerance(false)
+ .destination(TaskReportMSQDestination.instance())
+ .build()
+ )
+ );
+ }
+
@Test
public void test_computeStageGroups_fanIn()
{
@@ -487,6 +515,18 @@ public class ControllerQueryKernelUtilsTest
.build();
}
+ private static QueryDefinition makeLinearQueryDefinitionWithLeafInput()
+ {
+ // 0 -> 1 -> 2, where stage 1 also has a non-stage (leaf) input
+
+ return new MockQueryDefinitionBuilder(3)
+ .addEdge(0, 1)
+ .addNonStageInput(1)
+ .addEdge(1, 2)
+ .getQueryDefinitionBuilder()
+ .build();
+ }
+
private static QueryDefinition makeFanInQueryDefinition()
{
// 0 -> 2 -> 3
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
index a2d88a875c2..c0d5157f96e 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
@@ -67,6 +67,9 @@ public class MockQueryDefinitionBuilder
// would have an entry like B : [ <A, isBroadcast>, ... ]
private final Map<Integer, Set<IntBooleanPair>> adjacencyList = new
HashMap<>();
+ // Maps a stage to the number of non-stage inputs it should have
+ private final Map<Integer, Integer> nonStageInputCounts = new HashMap<>();
+
// Keeps a collection of those stages that have been already defined
private final Set<Integer> definedStages = new HashSet<>();
@@ -84,6 +87,20 @@ public class MockQueryDefinitionBuilder
return addEdge(outVertex, inVertex, false);
}
+ public MockQueryDefinitionBuilder addNonStageInput(final int stageNumber)
+ {
+ Preconditions.checkArgument(
+ stageNumber < numStages,
+ "vertex number can only be from 0 to one less than the total number of
stages"
+ );
+ Preconditions.checkArgument(
+ !definedStages.contains(stageNumber),
+ StringUtils.format("%s is already defined, cannot add non-stage
inputs", stageNumber)
+ );
+ nonStageInputCounts.merge(stageNumber, 1, Integer::sum);
+ return this;
+ }
+
public MockQueryDefinitionBuilder addEdge(final int outVertex, final int
inVertex, final boolean broadcast)
{
Preconditions.checkArgument(
@@ -182,6 +199,11 @@ public class MockQueryDefinitionBuilder
inputNumber++;
}
+ final int numNonStageInputs =
nonStageInputCounts.getOrDefault(stageNumber, 0);
+ for (int i = 0; i < numNonStageInputs; i++) {
+ inputSpecs.add(new ControllerTestInputSpec());
+ }
+
if (inputSpecs.isEmpty()) {
for (int i = 0; i < maxWorkers; i++) {
inputSpecs.add(new ControllerTestInputSpec());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]