This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bccfb7f449 fix: getOnlyNonBroadcastInputAsStageId should ignore 
non-stage inputs. (#19201)
0bccfb7f449 is described below

commit 0bccfb7f449cd8f953b5cc6c86a17509acba5c3f
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Mar 24 14:28:00 2026 -0700

    fix: getOnlyNonBroadcastInputAsStageId should ignore non-stage inputs. 
(#19201)
    
    This is a helper in ControllerQueryKernelUtils that helps the controller
    determine whether a stage can run in MEMORY output mode. The helper
    was erroneously considering all non-broadcast inputs, when really
    it should only consider non-broadcast stage-typed inputs.
---
 .../controller/ControllerQueryKernelUtils.java     | 25 ++++++++------
 .../controller/ControllerQueryKernelUtilsTest.java | 40 ++++++++++++++++++++++
 .../controller/MockQueryDefinitionBuilder.java     | 22 ++++++++++++
 3 files changed, 76 insertions(+), 11 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelUtils.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelUtils.java
index d971f33a9f2..e0f84bfa6da 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelUtils.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelUtils.java
@@ -26,7 +26,7 @@ import org.apache.druid.msq.exec.OutputChannelMode;
 import org.apache.druid.msq.indexing.destination.MSQDestination;
 import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
 import org.apache.druid.msq.input.InputSpec;
-import org.apache.druid.msq.input.InputSpecs;
+import org.apache.druid.msq.input.stage.StageInputSpec;
 import org.apache.druid.msq.kernel.QueryDefinition;
 import org.apache.druid.msq.kernel.StageDefinition;
 import org.apache.druid.msq.kernel.StageId;
@@ -330,7 +330,7 @@ public class ControllerQueryKernelUtils
       //      can only support a single reader.
       //   2) Downstream stages can only have a single input stage with output 
mode MEMORY. This isn't strictly
       //      necessary, but it simplifies the logic around concurrently 
launching stages.
-      return 
stageId.equals(getOnlyNonBroadcastInputAsStageId(outflowStageDef));
+      return 
stageId.equals(getOnlyNonBroadcastStageInputAsStageId(outflowStageDef));
     } else {
       return false;
     }
@@ -365,25 +365,28 @@ public class ControllerQueryKernelUtils
    * This is a helper used by {@link #canUseMemoryOutput}.
    */
   @Nullable
-  public static StageId getOnlyNonBroadcastInputAsStageId(final 
StageDefinition downstreamStageDef)
+  public static StageId getOnlyNonBroadcastStageInputAsStageId(final 
StageDefinition downstreamStageDef)
   {
     final List<InputSpec> inputSpecs = downstreamStageDef.getInputSpecs();
     final IntSet broadcastInputNumbers = 
downstreamStageDef.getBroadcastInputNumbers();
 
-    if (inputSpecs.size() - broadcastInputNumbers.size() != 1) {
-      return null;
-    }
+    StageId found = null;
 
     for (int i = 0; i < inputSpecs.size(); i++) {
-      if (!broadcastInputNumbers.contains(i)) {
-        final IntSet stageNumbers = 
InputSpecs.getStageNumbers(Collections.singletonList(inputSpecs.get(i)));
-        if (stageNumbers.size() == 1) {
-          return new StageId(downstreamStageDef.getId().getQueryId(), 
stageNumbers.iterator().nextInt());
+      if (!broadcastInputNumbers.contains(i) && inputSpecs.get(i) instanceof 
StageInputSpec) {
+        if (found != null) {
+          // More than one non-broadcast stage input.
+          return null;
         }
+
+        found = new StageId(
+            downstreamStageDef.getId().getQueryId(),
+            ((StageInputSpec) inputSpecs.get(i)).getStageNumber()
+        );
       }
     }
 
-    return null;
+    return found;
   }
 
   /**
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelUtilsTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelUtilsTest.java
index b6bb5bb3d4e..8830ffe63e3 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelUtilsTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelUtilsTest.java
@@ -302,6 +302,34 @@ public class ControllerQueryKernelUtilsTest
     );
   }
 
+  @Test
+  public void 
test_computeStageGroups_linearWithLeafInput_pipeline_threeAtOnce()
+  {
+    // 0 -> 1 (with leaf input) -> 2
+    // Stage 1 has a stage input from 0 and a non-stage (leaf) input.
+    // canUseMemoryOutput for stage 0 should still return true, because stage 
1 has
+    // only a single non-broadcast *stage* input.
+
+    final QueryDefinition queryDef = makeLinearQueryDefinitionWithLeafInput();
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            makeStageGroup(queryDef.getQueryId(), OutputChannelMode.MEMORY, 0, 
1, 2)
+        ),
+        ControllerQueryKernelUtils.computeStageGroups(
+            queryDef,
+            ControllerQueryKernelConfig
+                .builder()
+                .maxRetainedPartitionSketchBytes(1)
+                .maxConcurrentStages(3)
+                .pipeline(true)
+                .faultTolerance(false)
+                .destination(TaskReportMSQDestination.instance())
+                .build()
+        )
+    );
+  }
+
   @Test
   public void test_computeStageGroups_fanIn()
   {
@@ -487,6 +515,18 @@ public class ControllerQueryKernelUtilsTest
         .build();
   }
 
+  private static QueryDefinition makeLinearQueryDefinitionWithLeafInput()
+  {
+    // 0 -> 1 -> 2, where stage 1 also has a non-stage (leaf) input
+
+    return new MockQueryDefinitionBuilder(3)
+        .addEdge(0, 1)
+        .addNonStageInput(1)
+        .addEdge(1, 2)
+        .getQueryDefinitionBuilder()
+        .build();
+  }
+
   private static QueryDefinition makeFanInQueryDefinition()
   {
     // 0 -> 2 -> 3
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
index a2d88a875c2..c0d5157f96e 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
@@ -67,6 +67,9 @@ public class MockQueryDefinitionBuilder
   // would have an entry like B : [ <A, isBroadcast>, ... ]
   private final Map<Integer, Set<IntBooleanPair>> adjacencyList = new 
HashMap<>();
 
+  // Maps a stage to the number of non-stage inputs it should have
+  private final Map<Integer, Integer> nonStageInputCounts = new HashMap<>();
+
   // Keeps a collection of those stages that have been already defined
   private final Set<Integer> definedStages = new HashSet<>();
 
@@ -84,6 +87,20 @@ public class MockQueryDefinitionBuilder
     return addEdge(outVertex, inVertex, false);
   }
 
+  public MockQueryDefinitionBuilder addNonStageInput(final int stageNumber)
+  {
+    Preconditions.checkArgument(
+        stageNumber < numStages,
+        "vertex number can only be from 0 to one less than the total number of 
stages"
+    );
+    Preconditions.checkArgument(
+        !definedStages.contains(stageNumber),
+        StringUtils.format("%s is already defined, cannot add non-stage 
inputs", stageNumber)
+    );
+    nonStageInputCounts.merge(stageNumber, 1, Integer::sum);
+    return this;
+  }
+
   public MockQueryDefinitionBuilder addEdge(final int outVertex, final int 
inVertex, final boolean broadcast)
   {
     Preconditions.checkArgument(
@@ -182,6 +199,11 @@ public class MockQueryDefinitionBuilder
       inputNumber++;
     }
 
+    final int numNonStageInputs = 
nonStageInputCounts.getOrDefault(stageNumber, 0);
+    for (int i = 0; i < numNonStageInputs; i++) {
+      inputSpecs.add(new ControllerTestInputSpec());
+    }
+
     if (inputSpecs.isEmpty()) {
       for (int i = 0; i < maxWorkers; i++) {
         inputSpecs.add(new ControllerTestInputSpec());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to