LakshSingla commented on code in PR #16168:
URL: https://github.com/apache/druid/pull/16168#discussion_r1572844744


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerUtils.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.kernel.controller;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.msq.exec.OutputChannelMode;
+import org.apache.druid.msq.indexing.destination.MSQDestination;
+import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecs;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageId;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Utilties for {@link ControllerQueryKernel}.
+ */
+public class ControllerUtils

Review Comment:
   nit: ControllerQueryKernelUtils seems appropriate. There are many classes 
prefixed with Controller. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/StageGroup.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.kernel.controller;
+
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.msq.exec.OutputChannelMode;
+import org.apache.druid.msq.kernel.StageId;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Group of stages that must be launched as a unit. Within each group, stages 
communicate with each other using
+ * {@link OutputChannelMode#MEMORY} channels. The final stage in a group 
writes its own output using
+ * {@link #lastStageOutputChannelMode()}.
+ */
+public class StageGroup
+{
+  private final List<StageId> stageIds;

Review Comment:
   It seems that there's an implicit contract that the flow of the data between 
the stage group is linear (A -> B -> C ...) , and cannot be branched (
   A
   |
   B
   | \
   C D
   |  /
   E
   )
   We should probably mention that. Unrelated, but is it subject to change in 
the future? 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/StageGroup.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.kernel.controller;
+
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.msq.exec.OutputChannelMode;
+import org.apache.druid.msq.kernel.StageId;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Group of stages that must be launched as a unit. Within each group, stages 
communicate with each other using
+ * {@link OutputChannelMode#MEMORY} channels. The final stage in a group 
writes its own output using
+ * {@link #lastStageOutputChannelMode()}.
+ */
+public class StageGroup
+{
+  private final List<StageId> stageIds;
+  private final OutputChannelMode groupOutputChannelMode;
+
+  public StageGroup(final List<StageId> stageIds, final OutputChannelMode 
groupOutputChannelMode)
+  {
+    this.stageIds = stageIds;
+    this.groupOutputChannelMode = groupOutputChannelMode;
+  }
+
+  /**
+   * List of stage IDs in this group.

Review Comment:
   As mentioned previously, there's an implicit relation between the stages in 
the list, and [A, B, C] is not the same as [A, C, B] 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerUtils.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.kernel.controller;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.msq.exec.OutputChannelMode;
+import org.apache.druid.msq.indexing.destination.MSQDestination;
+import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecs;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageId;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Utilties for {@link ControllerQueryKernel}.
+ */
+public class ControllerUtils
+{
+  /**
+   * Put stages from {@link QueryDefinition} into groups that must each be 
launched simultaneously.
+   *
+   * This method's goal is to maximize the usage of {@link 
OutputChannelMode#MEMORY} channels, subject to constraints
+   * provided by {@link ControllerQueryKernelConfig#isPipeline()},
+   * {@link ControllerQueryKernelConfig#getMaxConcurrentStages()}, and
+   * {@link ControllerQueryKernelConfig#isFaultTolerant()}.
+   */
+  public static List<StageGroup> computeStageGroups(
+      final QueryDefinition queryDef,
+      final ControllerQueryKernelConfig config
+  )
+  {
+    final MSQDestination destination = config.getDestination();
+    final List<StageGroup> stageGroups = new ArrayList<>();
+    final boolean useDurableStorage = config.isDurableStorage();
+    final Map<StageId, Set<StageId>> inflow = computeStageInflowMap(queryDef);
+    final Map<StageId, Set<StageId>> outflow = 
computeStageOutflowMap(queryDef);
+    final Set<StageId> stagesRun = new HashSet<>();
+
+    while (stagesRun.size() < queryDef.getStageDefinitions().size()) {
+      // 1) Run all stages that cannot stream their output, as solo groups.
+      boolean didRun;
+      do {
+        didRun = false;
+
+        for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {
+          if (!stagesRun.contains(stageId)
+              && inflow.get(stageId).isEmpty()
+              && !canStreamOutput(queryDef, stageId.getStageNumber(), config, 
outflow)) {
+            stagesRun.add(stageId);
+            stageGroups.add(
+                new StageGroup(
+                    Collections.singletonList(stageId),
+                    getOutputChannelMode(
+                        queryDef,
+                        stageId.getStageNumber(),
+                        destination.toSelectDestination(),
+                        useDurableStorage,
+                        false
+                    )
+                )
+            );
+
+            removeStageFlow(stageId, inflow, outflow);
+            didRun = true;
+          }
+        }
+      } while (didRun);
+
+      // 2) Pick some stage that can stream its output, and run that as well 
as all ready-to-run dependents.
+      StageId currentStageId = null;
+      for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {
+        if (!stagesRun.contains(stageId)
+            && inflow.get(stageId).isEmpty()
+            && canStreamOutput(queryDef, stageId.getStageNumber(), config, 
outflow)) {
+          currentStageId = stageId;
+          break;
+        }
+      }
+
+      if (currentStageId != null) {

Review Comment:
   nit: Might be cleaner to inverse the condition
   ```suggestion
         if (currentStageId == null) {
            continue;
         }
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerUtils.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.kernel.controller;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.msq.exec.OutputChannelMode;
+import org.apache.druid.msq.indexing.destination.MSQDestination;
+import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecs;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageId;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Utilties for {@link ControllerQueryKernel}.
+ */
+public class ControllerUtils
+{
+  /**
+   * Put stages from {@link QueryDefinition} into groups that must each be 
launched simultaneously.
+   *
+   * This method's goal is to maximize the usage of {@link 
OutputChannelMode#MEMORY} channels, subject to constraints
+   * provided by {@link ControllerQueryKernelConfig#isPipeline()},
+   * {@link ControllerQueryKernelConfig#getMaxConcurrentStages()}, and
+   * {@link ControllerQueryKernelConfig#isFaultTolerant()}.
+   */
+  public static List<StageGroup> computeStageGroups(
+      final QueryDefinition queryDef,
+      final ControllerQueryKernelConfig config
+  )
+  {
+    final MSQDestination destination = config.getDestination();
+    final List<StageGroup> stageGroups = new ArrayList<>();
+    final boolean useDurableStorage = config.isDurableStorage();
+    final Map<StageId, Set<StageId>> inflow = computeStageInflowMap(queryDef);
+    final Map<StageId, Set<StageId>> outflow = 
computeStageOutflowMap(queryDef);
+    final Set<StageId> stagesRun = new HashSet<>();
+
+    while (stagesRun.size() < queryDef.getStageDefinitions().size()) {
+      // 1) Run all stages that cannot stream their output, as solo groups.
+      boolean didRun;
+      do {
+        didRun = false;
+
+        for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {
+          if (!stagesRun.contains(stageId)
+              && inflow.get(stageId).isEmpty()
+              && !canStreamOutput(queryDef, stageId.getStageNumber(), config, 
outflow)) {
+            stagesRun.add(stageId);
+            stageGroups.add(
+                new StageGroup(
+                    Collections.singletonList(stageId),
+                    getOutputChannelMode(
+                        queryDef,
+                        stageId.getStageNumber(),
+                        destination.toSelectDestination(),
+                        useDurableStorage,
+                        false
+                    )
+                )
+            );
+
+            removeStageFlow(stageId, inflow, outflow);
+            didRun = true;
+          }
+        }
+      } while (didRun);
+
+      // 2) Pick some stage that can stream its output, and run that as well 
as all ready-to-run dependents.
+      StageId currentStageId = null;
+      for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {
+        if (!stagesRun.contains(stageId)
+            && inflow.get(stageId).isEmpty()
+            && canStreamOutput(queryDef, stageId.getStageNumber(), config, 
outflow)) {
+          currentStageId = stageId;
+          break;
+        }
+      }
+
+      if (currentStageId != null) {
+        final List<StageId> currentStageGroup = new ArrayList<>();
+        final int maxStageGroupSize;
+        if (stageGroups.isEmpty()) {
+          maxStageGroupSize = config.getMaxConcurrentStages();
+        } else {
+          final StageGroup priorGroup = stageGroups.get(stageGroups.size() - 
1);
+          if (priorGroup.lastStageOutputChannelMode() == 
OutputChannelMode.MEMORY) {
+            // Prior group must run concurrently with this group.
+            maxStageGroupSize = config.getMaxConcurrentStages() - 
priorGroup.size();
+          } else {
+            // Prior group can exit before this group starts.
+            maxStageGroupSize = config.getMaxConcurrentStages();
+          }
+        }
+
+        OutputChannelMode currentOutputChannelMode = null;
+        while (currentStageId != null) {
+          final boolean canStream = canStreamOutput(queryDef, 
currentStageId.getStageNumber(), config, outflow);
+          final Set<StageId> currentOutflow = outflow.get(currentStageId);
+
+          final int maxStageGroupSizeAllowingForDownstreamConsumer;
+          if 
(queryDef.getStageDefinition(currentStageId).doesSortDuringShuffle()) {
+            // When the current group sorts, there's a pipeline break, so we 
can "leapfrog": close the prior group

Review Comment:
   There's this notion of a group sorting, that is tied to the stage's sorting. 
   
   When is a group considered to be sorting, when all, first, or the last of 
the stages sort? Logically, it should be first, but there's no indication in 
the code that implies that. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerUtils.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.kernel.controller;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.msq.exec.OutputChannelMode;
+import org.apache.druid.msq.indexing.destination.MSQDestination;
+import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecs;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageId;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Utilties for {@link ControllerQueryKernel}.
+ */
+public class ControllerUtils
+{
+  /**
+   * Put stages from {@link QueryDefinition} into groups that must each be 
launched simultaneously.
+   *
+   * This method's goal is to maximize the usage of {@link 
OutputChannelMode#MEMORY} channels, subject to constraints
+   * provided by {@link ControllerQueryKernelConfig#isPipeline()},
+   * {@link ControllerQueryKernelConfig#getMaxConcurrentStages()}, and
+   * {@link ControllerQueryKernelConfig#isFaultTolerant()}.
+   */
+  public static List<StageGroup> computeStageGroups(
+      final QueryDefinition queryDef,
+      final ControllerQueryKernelConfig config
+  )
+  {
+    final MSQDestination destination = config.getDestination();
+    final List<StageGroup> stageGroups = new ArrayList<>();
+    final boolean useDurableStorage = config.isDurableStorage();
+    final Map<StageId, Set<StageId>> inflow = computeStageInflowMap(queryDef);
+    final Map<StageId, Set<StageId>> outflow = 
computeStageOutflowMap(queryDef);
+    final Set<StageId> stagesRun = new HashSet<>();
+
+    while (stagesRun.size() < queryDef.getStageDefinitions().size()) {
+      // 1) Run all stages that cannot stream their output, as solo groups.
+      boolean didRun;
+      do {
+        didRun = false;
+
+        for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {
+          if (!stagesRun.contains(stageId)
+              && inflow.get(stageId).isEmpty()
+              && !canStreamOutput(queryDef, stageId.getStageNumber(), config, 
outflow)) {
+            stagesRun.add(stageId);
+            stageGroups.add(
+                new StageGroup(
+                    Collections.singletonList(stageId),
+                    getOutputChannelMode(
+                        queryDef,
+                        stageId.getStageNumber(),
+                        destination.toSelectDestination(),
+                        useDurableStorage,
+                        false
+                    )
+                )
+            );
+
+            removeStageFlow(stageId, inflow, outflow);
+            didRun = true;
+          }
+        }
+      } while (didRun);
+
+      // 2) Pick some stage that can stream its output, and run that as well 
as all ready-to-run dependents.
+      StageId currentStageId = null;
+      for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {
+        if (!stagesRun.contains(stageId)
+            && inflow.get(stageId).isEmpty()
+            && canStreamOutput(queryDef, stageId.getStageNumber(), config, 
outflow)) {
+          currentStageId = stageId;
+          break;
+        }
+      }
+
+      if (currentStageId != null) {
+        final List<StageId> currentStageGroup = new ArrayList<>();
+        final int maxStageGroupSize;
+        if (stageGroups.isEmpty()) {
+          maxStageGroupSize = config.getMaxConcurrentStages();
+        } else {
+          final StageGroup priorGroup = stageGroups.get(stageGroups.size() - 
1);
+          if (priorGroup.lastStageOutputChannelMode() == 
OutputChannelMode.MEMORY) {
+            // Prior group must run concurrently with this group.
+            maxStageGroupSize = config.getMaxConcurrentStages() - 
priorGroup.size();
+          } else {
+            // Prior group can exit before this group starts.
+            maxStageGroupSize = config.getMaxConcurrentStages();
+          }
+        }
+
+        OutputChannelMode currentOutputChannelMode = null;
+        while (currentStageId != null) {
+          final boolean canStream = canStreamOutput(queryDef, 
currentStageId.getStageNumber(), config, outflow);
+          final Set<StageId> currentOutflow = outflow.get(currentStageId);
+
+          final int maxStageGroupSizeAllowingForDownstreamConsumer;
+          if 
(queryDef.getStageDefinition(currentStageId).doesSortDuringShuffle()) {
+            // When the current group sorts, there's a pipeline break, so we 
can "leapfrog": close the prior group
+            // before starting the downstream group.
+            maxStageGroupSizeAllowingForDownstreamConsumer = 
config.getMaxConcurrentStages() - 1;
+          } else {
+            // When the current group doesn't sort, we can't leapfrog.
+            maxStageGroupSizeAllowingForDownstreamConsumer = maxStageGroupSize 
- 1;
+          }
+
+          currentOutputChannelMode =
+              getOutputChannelMode(
+                  queryDef,
+                  currentStageId.getStageNumber(),
+                  config.getDestination().toSelectDestination(),
+                  config.isDurableStorage(),
+                  canStream
+
+                  // Stages can only stream if they have <= 1 downstream stage 
(checked by "canStreamOutput") and if
+                  // that downstream stage has all of its other inputs 
available.
+                  && (currentOutflow.isEmpty() ||
+                      Collections.singleton(currentStageId)
+                                 
.equals(inflow.get(Iterables.getOnlyElement(currentOutflow))))
+
+                  // Stages can only stream if that one downstream consumer is 
able to start concurrently. So, once the
+                  // stage group gets too big, we must stop streaming.
+                  && (currentOutflow.isEmpty()
+                      || currentStageGroup.size() < 
maxStageGroupSizeAllowingForDownstreamConsumer)
+              );
+
+          currentStageGroup.add(currentStageId);
+
+          if (currentOutflow.size() == 1
+              && currentStageGroup.size() < maxStageGroupSize
+              && currentOutputChannelMode == OutputChannelMode.MEMORY
+
+              // Sorting causes a pipeline break: a sorting stage must read 
all its input before writing any output.
+              // Continue the stage group only if this stage does not sort its 
output.
+              && 
!queryDef.getStageDefinition(currentStageId).doesSortDuringShuffle()) {
+            currentStageId = Iterables.getOnlyElement(currentOutflow);
+          } else {
+            currentStageId = null;
+          }
+        }
+
+        stageGroups.add(new StageGroup(currentStageGroup, 
currentOutputChannelMode));
+
+        for (final StageId stageId : currentStageGroup) {
+          stagesRun.add(stageId);
+          removeStageFlow(stageId, inflow, outflow);
+        }
+      }
+    }
+
+    return stageGroups;
+  }
+
+  /**
+   * Returns a mapping of stage -> stages that flow *into* that stage. Uses 
TreeMaps and TreeSets so we have a
+   * consistent order for analyzing and running stages.
+   */
+  public static Map<StageId, Set<StageId>> computeStageInflowMap(final 
QueryDefinition queryDefinition)
+  {
+    final Map<StageId, Set<StageId>> retVal = new TreeMap<>();
+
+    for (final StageDefinition stageDef : 
queryDefinition.getStageDefinitions()) {
+      final StageId stageId = stageDef.getId();
+      retVal.computeIfAbsent(stageId, ignored -> new TreeSet<>());
+
+      for (final int inputStageNumber : 
queryDefinition.getStageDefinition(stageId).getInputStageNumbers()) {
+        final StageId inputStageId = new StageId(queryDefinition.getQueryId(), 
inputStageNumber);
+        retVal.computeIfAbsent(stageId, ignored -> new 
TreeSet<>()).add(inputStageId);
+      }
+    }
+
+    return retVal;
+  }
+
+  /**
+   * Returns a mapping of stage -> stages that depend on that stage. Uses 
TreeMaps and TreeSets so we have a consistent
+   * order for analyzing and running stages.
+   */
+  public static Map<StageId, Set<StageId>> computeStageOutflowMap(final 
QueryDefinition queryDefinition)
+  {
+    final Map<StageId, Set<StageId>> retVal = new TreeMap<>();
+
+    for (final StageDefinition stageDef : 
queryDefinition.getStageDefinitions()) {
+      final StageId stageId = stageDef.getId();
+      retVal.computeIfAbsent(stageId, ignored -> new TreeSet<>());
+
+      for (final int inputStageNumber : 
queryDefinition.getStageDefinition(stageId).getInputStageNumbers()) {
+        final StageId inputStageId = new StageId(queryDefinition.getQueryId(), 
inputStageNumber);
+        retVal.computeIfAbsent(inputStageId, ignored -> new 
TreeSet<>()).add(stageId);
+      }
+    }
+
+    return retVal;
+  }
+
+  /**
+   * Whether output of a stage can possibly use {@link 
OutputChannelMode#MEMORY}. Returning true does not guarantee
+   * that the stage *will* use {@link OutputChannelMode#MEMORY}. Additional 
requirements are checked in
+   * {@link #computeStageGroups(QueryDefinition, ControllerQueryKernelConfig)}.
+   */
+  public static boolean canStreamOutput(
+      final QueryDefinition queryDefinition,
+      final int stageNumber,
+      final ControllerQueryKernelConfig config,
+      final Map<StageId, Set<StageId>> outflowMap
+  )
+  {
+    if (config.isFaultTolerant()) {
+      // Cannot stream if fault tolerance is enabled: durable storage is 
required.
+      return false;
+    }
+
+    if (!config.isPipeline() || config.getMaxConcurrentStages() < 2) {
+      // Cannot stream if pipelining (& running multiple stages at once) is 
not enabled.
+      return false;
+    }
+
+    final StageId stageId = 
queryDefinition.getStageDefinition(stageNumber).getId();
+    final Set<StageId> outflowStageIds = outflowMap.get(stageId);
+
+    if (outflowStageIds.isEmpty()) {
+      return true;
+    } else if (outflowStageIds.size() == 1) {
+      final StageDefinition outflowStageDef =
+          
queryDefinition.getStageDefinition(Iterables.getOnlyElement(outflowStageIds));
+
+      // Two things happening here:
+      //   1) Stages cannot both stream and broadcast their output. This is 
because when streaming, we can only
+      //      support a single reader.
+      //   2) Stages can only receive a single streamed input. This isn't 
strictly necessary, but it simplifies the
+      //      logic around concurrently launching stages.
+      return 
stageId.equals(getOnlyNonBroadcastInputAsStageId(outflowStageDef));
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Return an {@link OutputChannelMode} to use for a given stage, based on 
query and context.
+   */
+  public static OutputChannelMode getOutputChannelMode(
+      final QueryDefinition queryDef,
+      final int stageNumber,
+      @Nullable final MSQSelectDestination selectDestination,
+      final boolean durableStorage,
+      final boolean canStream
+  )
+  {
+    final boolean isFinalStage = 
queryDef.getFinalStageDefinition().getStageNumber() == stageNumber;
+
+    if (isFinalStage && selectDestination == 
MSQSelectDestination.DURABLESTORAGE) {
+      return OutputChannelMode.DURABLE_STORAGE_QUERY_RESULTS;
+    } else if (canStream) {
+      return OutputChannelMode.MEMORY;
+    } else if (durableStorage) {
+      return OutputChannelMode.DURABLE_STORAGE_INTERMEDIATE;
+    } else {
+      return OutputChannelMode.LOCAL_STORAGE;
+    }
+  }
+
+  /**
+   * If a stage has a single non-broadcast input stage, returns that input 
stage. Otherwise, returns null.
+   * This is a helper used by {@link #canStreamOutput}.
+   */
+  @Nullable
+  public static StageId getOnlyNonBroadcastInputAsStageId(final 
StageDefinition downstreamStageDef)
+  {
+    final List<InputSpec> inputSpecs = downstreamStageDef.getInputSpecs();
+    final IntSet broadcastInputNumbers = 
downstreamStageDef.getBroadcastInputNumbers();
+
+    if (inputSpecs.size() - broadcastInputNumbers.size() != 1) {
+      return null;
+    }
+
+    for (int i = 0; i < inputSpecs.size(); i++) {
+      if (!broadcastInputNumbers.contains(i)) {
+        final IntSet stageNumbers = 
InputSpecs.getStageNumbers(Collections.singletonList(inputSpecs.get(i)));
+        if (stageNumbers.size() == 1) {
+          return new StageId(downstreamStageDef.getId().getQueryId(), 
stageNumbers.iterator().nextInt());
+        }
+      }
+    }
+
+    return null;
+  }
+
+  private static void removeStageFlow(

Review Comment:
   nit: Javadoc about preconditions and postconditions of the method, and what 
modifications it makes to the input maps.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerUtils.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.kernel.controller;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.msq.exec.OutputChannelMode;
+import org.apache.druid.msq.indexing.destination.MSQDestination;
+import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecs;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageId;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Utilties for {@link ControllerQueryKernel}.
+ */
+public class ControllerUtils
+{
+  /**
+   * Put stages from {@link QueryDefinition} into groups that must each be 
launched simultaneously.
+   *
+   * This method's goal is to maximize the usage of {@link 
OutputChannelMode#MEMORY} channels, subject to constraints
+   * provided by {@link ControllerQueryKernelConfig#isPipeline()},
+   * {@link ControllerQueryKernelConfig#getMaxConcurrentStages()}, and
+   * {@link ControllerQueryKernelConfig#isFaultTolerant()}.
+   */
+  public static List<StageGroup> computeStageGroups(
+      final QueryDefinition queryDef,
+      final ControllerQueryKernelConfig config
+  )
+  {
+    final MSQDestination destination = config.getDestination();
+    final List<StageGroup> stageGroups = new ArrayList<>();
+    final boolean useDurableStorage = config.isDurableStorage();
+    final Map<StageId, Set<StageId>> inflow = computeStageInflowMap(queryDef);
+    final Map<StageId, Set<StageId>> outflow = 
computeStageOutflowMap(queryDef);
+    final Set<StageId> stagesRun = new HashSet<>();
+
+    while (stagesRun.size() < queryDef.getStageDefinitions().size()) {
+      // 1) Run all stages that cannot stream their output, as solo groups.
+      boolean didRun;
+      do {
+        didRun = false;
+
+        for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {
+          if (!stagesRun.contains(stageId)
+              && inflow.get(stageId).isEmpty()

Review Comment:
   What's the significance of inflow being empty?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerUtils.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.kernel.controller;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.msq.exec.OutputChannelMode;
+import org.apache.druid.msq.indexing.destination.MSQDestination;
+import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecs;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageId;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Utilties for {@link ControllerQueryKernel}.
+ */
+public class ControllerUtils
+{
+  /**
+   * Put stages from {@link QueryDefinition} into groups that must each be 
launched simultaneously.
+   *
+   * This method's goal is to maximize the usage of {@link 
OutputChannelMode#MEMORY} channels, subject to constraints
+   * provided by {@link ControllerQueryKernelConfig#isPipeline()},
+   * {@link ControllerQueryKernelConfig#getMaxConcurrentStages()}, and
+   * {@link ControllerQueryKernelConfig#isFaultTolerant()}.
+   */
+  public static List<StageGroup> computeStageGroups(
+      final QueryDefinition queryDef,
+      final ControllerQueryKernelConfig config
+  )
+  {
+    final MSQDestination destination = config.getDestination();
+    final List<StageGroup> stageGroups = new ArrayList<>();
+    final boolean useDurableStorage = config.isDurableStorage();
+    final Map<StageId, Set<StageId>> inflow = computeStageInflowMap(queryDef);
+    final Map<StageId, Set<StageId>> outflow = 
computeStageOutflowMap(queryDef);
+    final Set<StageId> stagesRun = new HashSet<>();
+
+    while (stagesRun.size() < queryDef.getStageDefinitions().size()) {
+      // 1) Run all stages that cannot stream their output, as solo groups.
+      boolean didRun;
+      do {
+        didRun = false;
+
+        for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {
+          if (!stagesRun.contains(stageId)
+              && inflow.get(stageId).isEmpty()
+              && !canStreamOutput(queryDef, stageId.getStageNumber(), config, 
outflow)) {
+            stagesRun.add(stageId);
+            stageGroups.add(
+                new StageGroup(
+                    Collections.singletonList(stageId),
+                    getOutputChannelMode(
+                        queryDef,
+                        stageId.getStageNumber(),
+                        destination.toSelectDestination(),
+                        useDurableStorage,
+                        false
+                    )
+                )
+            );
+
+            removeStageFlow(stageId, inflow, outflow);
+            didRun = true;
+          }
+        }
+      } while (didRun);
+
+      // 2) Pick some stage that can stream its output, and run that as well 
as all ready-to-run dependents.
+      StageId currentStageId = null;
+      for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {

Review Comment:
   There's an implicit assumption that the inflow's keySet is sorted, which is 
true since it is actually a TreeMap, however isn't reflected here. Perhaps we 
should tighten the contract that the inflow map is a TreeMap, instead of a Map 
(which can be a HashMap as well, which wouldn't work well with this logic)



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerUtils.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.kernel.controller;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.msq.exec.OutputChannelMode;
+import org.apache.druid.msq.indexing.destination.MSQDestination;
+import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecs;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageId;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Utilties for {@link ControllerQueryKernel}.
+ */
+public class ControllerUtils
+{
+  /**
+   * Put stages from {@link QueryDefinition} into groups that must each be 
launched simultaneously.
+   *
+   * This method's goal is to maximize the usage of {@link 
OutputChannelMode#MEMORY} channels, subject to constraints
+   * provided by {@link ControllerQueryKernelConfig#isPipeline()},
+   * {@link ControllerQueryKernelConfig#getMaxConcurrentStages()}, and
+   * {@link ControllerQueryKernelConfig#isFaultTolerant()}.
+   */
+  public static List<StageGroup> computeStageGroups(
+      final QueryDefinition queryDef,
+      final ControllerQueryKernelConfig config
+  )
+  {
+    final MSQDestination destination = config.getDestination();
+    final List<StageGroup> stageGroups = new ArrayList<>();
+    final boolean useDurableStorage = config.isDurableStorage();
+    final Map<StageId, Set<StageId>> inflow = computeStageInflowMap(queryDef);
+    final Map<StageId, Set<StageId>> outflow = 
computeStageOutflowMap(queryDef);
+    final Set<StageId> stagesRun = new HashSet<>();
+
+    while (stagesRun.size() < queryDef.getStageDefinitions().size()) {
+      // 1) Run all stages that cannot stream their output, as solo groups.
+      boolean didRun;
+      do {
+        didRun = false;
+
+        for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {
+          if (!stagesRun.contains(stageId)
+              && inflow.get(stageId).isEmpty()
+              && !canStreamOutput(queryDef, stageId.getStageNumber(), config, 
outflow)) {
+            stagesRun.add(stageId);
+            stageGroups.add(
+                new StageGroup(
+                    Collections.singletonList(stageId),
+                    getOutputChannelMode(
+                        queryDef,
+                        stageId.getStageNumber(),
+                        destination.toSelectDestination(),
+                        useDurableStorage,
+                        false
+                    )
+                )
+            );
+
+            removeStageFlow(stageId, inflow, outflow);
+            didRun = true;
+          }
+        }
+      } while (didRun);
+
+      // 2) Pick some stage that can stream its output, and run that as well 
as all ready-to-run dependents.
+      StageId currentStageId = null;
+      for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {
+        if (!stagesRun.contains(stageId)
+            && inflow.get(stageId).isEmpty()
+            && canStreamOutput(queryDef, stageId.getStageNumber(), config, 
outflow)) {
+          currentStageId = stageId;
+          break;
+        }
+      }
+
+      if (currentStageId != null) {
+        final List<StageId> currentStageGroup = new ArrayList<>();
+        final int maxStageGroupSize;
+        if (stageGroups.isEmpty()) {
+          maxStageGroupSize = config.getMaxConcurrentStages();
+        } else {
+          final StageGroup priorGroup = stageGroups.get(stageGroups.size() - 
1);
+          if (priorGroup.lastStageOutputChannelMode() == 
OutputChannelMode.MEMORY) {
+            // Prior group must run concurrently with this group.

Review Comment:
   Doesn't this defy the logic of stage groups - each group runs individually, 
and all the stages within a group run simultanously. If the prior group runs 
concurrently with this group, shouldn't they be coalesced into a single group? 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerUtils.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.kernel.controller;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.msq.exec.OutputChannelMode;
+import org.apache.druid.msq.indexing.destination.MSQDestination;
+import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecs;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageId;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Utilties for {@link ControllerQueryKernel}.
+ */
+public class ControllerUtils
+{
+  /**
+   * Put stages from {@link QueryDefinition} into groups that must each be 
launched simultaneously.
+   *
+   * This method's goal is to maximize the usage of {@link 
OutputChannelMode#MEMORY} channels, subject to constraints
+   * provided by {@link ControllerQueryKernelConfig#isPipeline()},
+   * {@link ControllerQueryKernelConfig#getMaxConcurrentStages()}, and
+   * {@link ControllerQueryKernelConfig#isFaultTolerant()}.
+   */
+  public static List<StageGroup> computeStageGroups(
+      final QueryDefinition queryDef,
+      final ControllerQueryKernelConfig config
+  )
+  {
+    final MSQDestination destination = config.getDestination();
+    final List<StageGroup> stageGroups = new ArrayList<>();
+    final boolean useDurableStorage = config.isDurableStorage();
+    final Map<StageId, Set<StageId>> inflow = computeStageInflowMap(queryDef);
+    final Map<StageId, Set<StageId>> outflow = 
computeStageOutflowMap(queryDef);
+    final Set<StageId> stagesRun = new HashSet<>();
+
+    while (stagesRun.size() < queryDef.getStageDefinitions().size()) {
+      // 1) Run all stages that cannot stream their output, as solo groups.
+      boolean didRun;
+      do {
+        didRun = false;
+
+        for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {
+          if (!stagesRun.contains(stageId)
+              && inflow.get(stageId).isEmpty()
+              && !canStreamOutput(queryDef, stageId.getStageNumber(), config, 
outflow)) {
+            stagesRun.add(stageId);
+            stageGroups.add(
+                new StageGroup(
+                    Collections.singletonList(stageId),
+                    getOutputChannelMode(
+                        queryDef,
+                        stageId.getStageNumber(),
+                        destination.toSelectDestination(),
+                        useDurableStorage,
+                        false
+                    )
+                )
+            );
+
+            removeStageFlow(stageId, inflow, outflow);
+            didRun = true;
+          }
+        }
+      } while (didRun);
+
+      // 2) Pick some stage that can stream its output, and run that as well 
as all ready-to-run dependents.
+      StageId currentStageId = null;
+      for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {
+        if (!stagesRun.contains(stageId)
+            && inflow.get(stageId).isEmpty()
+            && canStreamOutput(queryDef, stageId.getStageNumber(), config, 
outflow)) {
+          currentStageId = stageId;
+          break;
+        }
+      }
+
+      if (currentStageId != null) {
+        final List<StageId> currentStageGroup = new ArrayList<>();
+        final int maxStageGroupSize;
+        if (stageGroups.isEmpty()) {
+          maxStageGroupSize = config.getMaxConcurrentStages();
+        } else {
+          final StageGroup priorGroup = stageGroups.get(stageGroups.size() - 
1);
+          if (priorGroup.lastStageOutputChannelMode() == 
OutputChannelMode.MEMORY) {
+            // Prior group must run concurrently with this group.
+            maxStageGroupSize = config.getMaxConcurrentStages() - 
priorGroup.size();

Review Comment:
   Can this ever be 0, and break the logic. Partially related to the question 
posed above. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerUtils.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.kernel.controller;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.msq.exec.OutputChannelMode;
+import org.apache.druid.msq.indexing.destination.MSQDestination;
+import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecs;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageId;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Utilties for {@link ControllerQueryKernel}.
+ */
+public class ControllerUtils
+{
+  /**
+   * Put stages from {@link QueryDefinition} into groups that must each be 
launched simultaneously.
+   *
+   * This method's goal is to maximize the usage of {@link 
OutputChannelMode#MEMORY} channels, subject to constraints
+   * provided by {@link ControllerQueryKernelConfig#isPipeline()},
+   * {@link ControllerQueryKernelConfig#getMaxConcurrentStages()}, and
+   * {@link ControllerQueryKernelConfig#isFaultTolerant()}.
+   */
+  public static List<StageGroup> computeStageGroups(
+      final QueryDefinition queryDef,
+      final ControllerQueryKernelConfig config
+  )
+  {
+    final MSQDestination destination = config.getDestination();
+    final List<StageGroup> stageGroups = new ArrayList<>();
+    final boolean useDurableStorage = config.isDurableStorage();
+    final Map<StageId, Set<StageId>> inflow = computeStageInflowMap(queryDef);
+    final Map<StageId, Set<StageId>> outflow = 
computeStageOutflowMap(queryDef);
+    final Set<StageId> stagesRun = new HashSet<>();
+
+    while (stagesRun.size() < queryDef.getStageDefinitions().size()) {
+      // 1) Run all stages that cannot stream their output, as solo groups.
+      boolean didRun;
+      do {
+        didRun = false;
+
+        for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {
+          if (!stagesRun.contains(stageId)
+              && inflow.get(stageId).isEmpty()
+              && !canStreamOutput(queryDef, stageId.getStageNumber(), config, 
outflow)) {
+            stagesRun.add(stageId);
+            stageGroups.add(
+                new StageGroup(
+                    Collections.singletonList(stageId),
+                    getOutputChannelMode(
+                        queryDef,
+                        stageId.getStageNumber(),
+                        destination.toSelectDestination(),
+                        useDurableStorage,
+                        false
+                    )
+                )
+            );
+
+            removeStageFlow(stageId, inflow, outflow);
+            didRun = true;
+          }
+        }
+      } while (didRun);
+
+      // 2) Pick some stage that can stream its output, and run that as well 
as all ready-to-run dependents.
+      StageId currentStageId = null;
+      for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {
+        if (!stagesRun.contains(stageId)
+            && inflow.get(stageId).isEmpty()
+            && canStreamOutput(queryDef, stageId.getStageNumber(), config, 
outflow)) {
+          currentStageId = stageId;
+          break;
+        }
+      }
+
+      if (currentStageId != null) {
+        final List<StageId> currentStageGroup = new ArrayList<>();
+        final int maxStageGroupSize;
+        if (stageGroups.isEmpty()) {
+          maxStageGroupSize = config.getMaxConcurrentStages();
+        } else {
+          final StageGroup priorGroup = stageGroups.get(stageGroups.size() - 
1);
+          if (priorGroup.lastStageOutputChannelMode() == 
OutputChannelMode.MEMORY) {
+            // Prior group must run concurrently with this group.
+            maxStageGroupSize = config.getMaxConcurrentStages() - 
priorGroup.size();
+          } else {
+            // Prior group can exit before this group starts.
+            maxStageGroupSize = config.getMaxConcurrentStages();
+          }
+        }
+
+        OutputChannelMode currentOutputChannelMode = null;
+        while (currentStageId != null) {
+          final boolean canStream = canStreamOutput(queryDef, 
currentStageId.getStageNumber(), config, outflow);
+          final Set<StageId> currentOutflow = outflow.get(currentStageId);
+
+          final int maxStageGroupSizeAllowingForDownstreamConsumer;
+          if 
(queryDef.getStageDefinition(currentStageId).doesSortDuringShuffle()) {
+            // When the current group sorts, there's a pipeline break, so we 
can "leapfrog": close the prior group
+            // before starting the downstream group.
+            maxStageGroupSizeAllowingForDownstreamConsumer = 
config.getMaxConcurrentStages() - 1;
+          } else {
+            // When the current group doesn't sort, we can't leapfrog.
+            maxStageGroupSizeAllowingForDownstreamConsumer = maxStageGroupSize 
- 1;
+          }
+
+          currentOutputChannelMode =
+              getOutputChannelMode(
+                  queryDef,
+                  currentStageId.getStageNumber(),
+                  config.getDestination().toSelectDestination(),
+                  config.isDurableStorage(),
+                  canStream
+
+                  // Stages can only stream if they have <= 1 downstream stage 
(checked by "canStreamOutput") and if
+                  // that downstream stage has all of its other inputs 
available.
+                  && (currentOutflow.isEmpty() ||
+                      Collections.singleton(currentStageId)
+                                 
.equals(inflow.get(Iterables.getOnlyElement(currentOutflow))))
+
+                  // Stages can only stream if that one downstream consumer is 
able to start concurrently. So, once the
+                  // stage group gets too big, we must stop streaming.
+                  && (currentOutflow.isEmpty()
+                      || currentStageGroup.size() < 
maxStageGroupSizeAllowingForDownstreamConsumer)
+              );
+
+          currentStageGroup.add(currentStageId);
+
+          if (currentOutflow.size() == 1
+              && currentStageGroup.size() < maxStageGroupSize
+              && currentOutputChannelMode == OutputChannelMode.MEMORY
+
+              // Sorting causes a pipeline break: a sorting stage must read 
all its input before writing any output.
+              // Continue the stage group only if this stage does not sort its 
output.
+              && 
!queryDef.getStageDefinition(currentStageId).doesSortDuringShuffle()) {
+            currentStageId = Iterables.getOnlyElement(currentOutflow);
+          } else {
+            currentStageId = null;
+          }
+        }
+
+        stageGroups.add(new StageGroup(currentStageGroup, 
currentOutputChannelMode));
+
+        for (final StageId stageId : currentStageGroup) {
+          stagesRun.add(stageId);
+          removeStageFlow(stageId, inflow, outflow);
+        }
+      }
+    }
+
+    return stageGroups;
+  }
+
+  /**
+   * Returns a mapping of stage -> stages that flow *into* that stage. Uses 
TreeMaps and TreeSets so we have a
+   * consistent order for analyzing and running stages.
+   */
+  public static Map<StageId, Set<StageId>> computeStageInflowMap(final 
QueryDefinition queryDefinition)
+  {
+    final Map<StageId, Set<StageId>> retVal = new TreeMap<>();
+
+    for (final StageDefinition stageDef : 
queryDefinition.getStageDefinitions()) {
+      final StageId stageId = stageDef.getId();
+      retVal.computeIfAbsent(stageId, ignored -> new TreeSet<>());
+
+      for (final int inputStageNumber : 
queryDefinition.getStageDefinition(stageId).getInputStageNumbers()) {
+        final StageId inputStageId = new StageId(queryDefinition.getQueryId(), 
inputStageNumber);
+        retVal.computeIfAbsent(stageId, ignored -> new 
TreeSet<>()).add(inputStageId);
+      }
+    }
+
+    return retVal;
+  }
+
+  /**
+   * Returns a mapping of stage -> stages that depend on that stage. Uses 
TreeMaps and TreeSets so we have a consistent
+   * order for analyzing and running stages.
+   */
+  public static Map<StageId, Set<StageId>> computeStageOutflowMap(final 
QueryDefinition queryDefinition)
+  {
+    final Map<StageId, Set<StageId>> retVal = new TreeMap<>();
+
+    for (final StageDefinition stageDef : 
queryDefinition.getStageDefinitions()) {
+      final StageId stageId = stageDef.getId();
+      retVal.computeIfAbsent(stageId, ignored -> new TreeSet<>());
+
+      for (final int inputStageNumber : 
queryDefinition.getStageDefinition(stageId).getInputStageNumbers()) {
+        final StageId inputStageId = new StageId(queryDefinition.getQueryId(), 
inputStageNumber);
+        retVal.computeIfAbsent(inputStageId, ignored -> new 
TreeSet<>()).add(stageId);
+      }
+    }
+
+    return retVal;
+  }
+
+  /**
+   * Whether output of a stage can possibly use {@link 
OutputChannelMode#MEMORY}. Returning true does not guarantee
+   * that the stage *will* use {@link OutputChannelMode#MEMORY}. Additional 
requirements are checked in
+   * {@link #computeStageGroups(QueryDefinition, ControllerQueryKernelConfig)}.
+   */
+  public static boolean canStreamOutput(
+      final QueryDefinition queryDefinition,
+      final int stageNumber,
+      final ControllerQueryKernelConfig config,
+      final Map<StageId, Set<StageId>> outflowMap
+  )
+  {
+    if (config.isFaultTolerant()) {
+      // Cannot stream if fault tolerance is enabled: durable storage is 
required.
+      return false;
+    }
+
+    if (!config.isPipeline() || config.getMaxConcurrentStages() < 2) {
+      // Cannot stream if pipelining (& running multiple stages at once) is 
not enabled.
+      return false;
+    }
+
+    final StageId stageId = 
queryDefinition.getStageDefinition(stageNumber).getId();
+    final Set<StageId> outflowStageIds = outflowMap.get(stageId);
+
+    if (outflowStageIds.isEmpty()) {
+      return true;
+    } else if (outflowStageIds.size() == 1) {
+      final StageDefinition outflowStageDef =
+          
queryDefinition.getStageDefinition(Iterables.getOnlyElement(outflowStageIds));
+
+      // Two things happening here:
+      //   1) Stages cannot both stream and broadcast their output. This is 
because when streaming, we can only
+      //      support a single reader.
+      //   2) Stages can only receive a single streamed input. This isn't 
strictly necessary, but it simplifies the

Review Comment:
   What is the signifcance of streamed input? There is probably some gap in my 
understanding of streamed, but would we call the data lying in durable storage 
ready to be read as streamed? Logically the stage can stream the output in that 
case, and per my understanding, this code would allow that to happen, but I 
wouldn't call it "streamed" input.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerUtils.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.kernel.controller;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.msq.exec.OutputChannelMode;
+import org.apache.druid.msq.indexing.destination.MSQDestination;
+import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecs;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageId;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Utilties for {@link ControllerQueryKernel}.
+ */
+public class ControllerUtils
+{
+  /**
+   * Put stages from {@link QueryDefinition} into groups that must each be 
launched simultaneously.
+   *
+   * This method's goal is to maximize the usage of {@link 
OutputChannelMode#MEMORY} channels, subject to constraints
+   * provided by {@link ControllerQueryKernelConfig#isPipeline()},
+   * {@link ControllerQueryKernelConfig#getMaxConcurrentStages()}, and
+   * {@link ControllerQueryKernelConfig#isFaultTolerant()}.
+   */
+  public static List<StageGroup> computeStageGroups(
+      final QueryDefinition queryDef,
+      final ControllerQueryKernelConfig config
+  )
+  {
+    final MSQDestination destination = config.getDestination();
+    final List<StageGroup> stageGroups = new ArrayList<>();
+    final boolean useDurableStorage = config.isDurableStorage();
+    final Map<StageId, Set<StageId>> inflow = computeStageInflowMap(queryDef);
+    final Map<StageId, Set<StageId>> outflow = 
computeStageOutflowMap(queryDef);
+    final Set<StageId> stagesRun = new HashSet<>();
+
+    while (stagesRun.size() < queryDef.getStageDefinitions().size()) {
+      // 1) Run all stages that cannot stream their output, as solo groups.
+      boolean didRun;
+      do {
+        didRun = false;
+
+        for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {
+          if (!stagesRun.contains(stageId)
+              && inflow.get(stageId).isEmpty()
+              && !canStreamOutput(queryDef, stageId.getStageNumber(), config, 
outflow)) {
+            stagesRun.add(stageId);
+            stageGroups.add(
+                new StageGroup(
+                    Collections.singletonList(stageId),
+                    getOutputChannelMode(
+                        queryDef,
+                        stageId.getStageNumber(),
+                        destination.toSelectDestination(),
+                        useDurableStorage,
+                        false
+                    )
+                )
+            );
+
+            removeStageFlow(stageId, inflow, outflow);
+            didRun = true;
+          }
+        }
+      } while (didRun);
+
+      // 2) Pick some stage that can stream its output, and run that as well 
as all ready-to-run dependents.
+      StageId currentStageId = null;
+      for (final StageId stageId : ImmutableList.copyOf(inflow.keySet())) {
+        if (!stagesRun.contains(stageId)
+            && inflow.get(stageId).isEmpty()
+            && canStreamOutput(queryDef, stageId.getStageNumber(), config, 
outflow)) {
+          currentStageId = stageId;
+          break;
+        }
+      }
+
+      if (currentStageId != null) {
+        final List<StageId> currentStageGroup = new ArrayList<>();
+        final int maxStageGroupSize;
+        if (stageGroups.isEmpty()) {
+          maxStageGroupSize = config.getMaxConcurrentStages();
+        } else {
+          final StageGroup priorGroup = stageGroups.get(stageGroups.size() - 
1);
+          if (priorGroup.lastStageOutputChannelMode() == 
OutputChannelMode.MEMORY) {
+            // Prior group must run concurrently with this group.
+            maxStageGroupSize = config.getMaxConcurrentStages() - 
priorGroup.size();
+          } else {
+            // Prior group can exit before this group starts.
+            maxStageGroupSize = config.getMaxConcurrentStages();
+          }
+        }
+
+        OutputChannelMode currentOutputChannelMode = null;
+        while (currentStageId != null) {
+          final boolean canStream = canStreamOutput(queryDef, 
currentStageId.getStageNumber(), config, outflow);
+          final Set<StageId> currentOutflow = outflow.get(currentStageId);
+
+          final int maxStageGroupSizeAllowingForDownstreamConsumer;
+          if 
(queryDef.getStageDefinition(currentStageId).doesSortDuringShuffle()) {
+            // When the current group sorts, there's a pipeline break, so we 
can "leapfrog": close the prior group
+            // before starting the downstream group.
+            maxStageGroupSizeAllowingForDownstreamConsumer = 
config.getMaxConcurrentStages() - 1;

Review Comment:
   Can something like following happen:
   a) Prior's group ouptut type is OutputChannelMode.MEMORY (and prior + this 
group must run together)
   b) The group sorts, therefore 
'maxStageGroupSizeAllowingForDownstreamConsumer' = maxConcurrentStages - 1
   c) The actual number of stages running simultaneously will be 
(priorGroup.size() + maxConcurrentStages - 1), which can be greater than 
maxConcurrentStages.
   
   If not, what's there to prevent it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to