paul-rogers commented on code in PR #12918:
URL: https://github.com/apache/druid/pull/12918#discussion_r950527010


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.guice.MultiStageQuery;
+import org.apache.druid.msq.indexing.error.CanceledFault;
+import org.apache.druid.msq.indexing.error.DurableStorageConfigurationFault;
+import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
+import org.apache.druid.msq.indexing.error.MSQErrorReport;
+import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.MSQFault;
+import org.apache.druid.msq.indexing.error.UnknownFault;
+import org.apache.druid.msq.indexing.error.WorkerFailedFault;
+import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
+import org.apache.druid.msq.statistics.KeyCollectorFactory;
+import org.apache.druid.msq.statistics.KeyCollectorSnapshot;
+import org.apache.druid.msq.statistics.KeyCollectorSnapshotDeserializerModule;
+import org.apache.druid.msq.statistics.KeyCollectors;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.storage.StorageConnector;
+
+import javax.annotation.Nullable;
+import java.util.UUID;
+
+public class MSQTasks
+{
+  /**
+   * Message used by {@link #makeErrorReport} when no other message is known.
+   */
+  static final String GENERIC_QUERY_FAILED_MESSAGE = "Query failed";
+
+  private static final String TASK_ID_PREFIX = "query-";
+
+  /**
+   * Returns a controller task ID given a
+   */
+  public static String controllerTaskId(@Nullable final String queryId)
+  {
+    return TASK_ID_PREFIX + (queryId == null ? UUID.randomUUID().toString() : 
queryId);
+  }
+
+  /**
+   * Returns a controller task ID given a
+   */
+  public static String workerTaskId(final String controllerTaskId, final int 
workerNumber)
+  {
+    return StringUtils.format("%s-worker%d", controllerTaskId, workerNumber);
+  }
+
+  /**
+   * If "Object" is a Long, returns it. Otherwise, throws an appropriate 
exception assuming this operation is
+   * being done to read the primary timestamp (__time) as part of an INSERT.
+   */
+  public static long primaryTimestampFromObjectForInsert(final Object 
timestamp)
+  {
+    if (timestamp instanceof Long) {
+      return (long) timestamp;
+    } else if (timestamp == null) {
+      throw new MSQException(InsertTimeNullFault.INSTANCE);
+    } else {
+      // Normally we expect the SQL layer to validate that __time for INSERT 
is a TIMESTAMP type, which would
+      // be a long at execution time. So a nice user-friendly message isn't 
needed here: it would only happen
+      // if the SQL layer is bypassed. Nice, friendly users wouldn't do that :)

Review Comment:
   This level of "behind the scenes" explanation is super helpful!



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSlice.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.input;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+/**
+ * Slice of an {@link InputSpec} assigned to a particular worker.
+ *
+ * On the controller, these are produced using {@link InputSpecSlicer}. On 
workers, these are read
+ * using {@link InputSliceReader}.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+public interface InputSlice
+{
+  /**
+   * Returns the number of files contained within this split. This is the same 
number that would be added to
+   * {@link org.apache.druid.msq.counters.CounterTracker} on full iteration 
through {@link InputSliceReader#attach}.
+   *
+   * May be zero for some kinds of slices, even if they contain data, if the 
input is not file-based.
+   */
+  int numFiles();

Review Comment:
   Nit: `fileCount` reads just a bit better than `numFiles`.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java:
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit.groupby;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.SortColumn;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.msq.input.stage.StageInputSpec;
+import org.apache.druid.msq.kernel.MaxCountShuffleSpec;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.querykit.DataSourcePlan;
+import org.apache.druid.msq.querykit.QueryKit;
+import org.apache.druid.msq.querykit.QueryKitUtils;
+import org.apache.druid.msq.querykit.ShuffleSpecFactories;
+import org.apache.druid.msq.querykit.ShuffleSpecFactory;
+import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.having.AlwaysHavingSpec;
+import org.apache.druid.query.groupby.having.DimFilterHavingSpec;
+import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
+import org.apache.druid.query.groupby.orderby.NoopLimitSpec;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
+import org.apache.druid.query.ordering.StringComparator;
+import org.apache.druid.query.ordering.StringComparators;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.ValueType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class GroupByQueryKit implements QueryKit<GroupByQuery>

Review Comment:
   A general comment on this approach is that we seem to be mapping from one 
big query definition (the native query) to another (the MSQ query definition.) 
Other projects have had much success decomposing a query plan into isolated 
functional units (often called "operators") that feed into one another via a 
pipeline. A future direction here might be to do the same: have a frame 
processor hold a pipeline of operators for scan, sort, merge, limit, offset, 
etc. Might make the logic a bit more flexible and reusable.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit.scan;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.SortColumn;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.msq.input.stage.StageInputSpec;
+import org.apache.druid.msq.kernel.MaxCountShuffleSpec;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
+import org.apache.druid.msq.kernel.ShuffleSpec;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.querykit.DataSourcePlan;
+import org.apache.druid.msq.querykit.QueryKit;
+import org.apache.druid.msq.querykit.QueryKitUtils;
+import org.apache.druid.msq.querykit.ShuffleSpecFactory;
+import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.rel.DruidQuery;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ScanQueryKit implements QueryKit<ScanQuery>
+{
+  private final ObjectMapper jsonMapper;
+
+  public ScanQueryKit(final ObjectMapper jsonMapper)
+  {
+    this.jsonMapper = jsonMapper;
+  }
+
+  public static RowSignature getAndValidateSignature(final ScanQuery 
scanQuery, final ObjectMapper jsonMapper)
+  {
+    RowSignature scanSignature;
+    try {
+      final String s = 
scanQuery.getContextValue(DruidQuery.CTX_SCAN_SIGNATURE);
+      scanSignature = jsonMapper.readValue(s, RowSignature.class);
+    }
+    catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+    // Verify the signature prior to any actual processing.
+    QueryKitUtils.verifyRowSignature(scanSignature);
+    return scanSignature;
+  }
+
+  /**
+   * We ignore the resultShuffleSpecFactory in case:
+   *  1. There is no cluster by
+   *  2. This is an offset which means everything gets funneled into a single 
partition hence we use MaxCountShuffleSpec
+   */
+  // No ordering, but there is a limit or an offset. These work by funneling 
everything through a single partition.
+  // So there is no point in forcing any particular partitioning. Since 
everything is funnelled into a single
+  // partition without a ClusterBy, we donot need to necessarily create it via 
the resultShuffleSpecFactory provided

Review Comment:
   Nit: "donot" -> "do not"



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.kernel.controller;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import it.unimi.dsi.fastutil.ints.Int2IntAVLTreeMap;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import org.apache.druid.frame.key.ClusterByPartitions;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.exec.QueryValidator;
+import org.apache.druid.msq.indexing.error.MSQFault;
+import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.msq.input.InputSpecSlicerFactory;
+import org.apache.druid.msq.input.stage.ReadablePartitions;
+import org.apache.druid.msq.kernel.ExtraInfoHolder;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageId;
+import org.apache.druid.msq.kernel.WorkOrder;
+import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Kernel for the controller of a multi-stage query.
+ *
+ * Instances of this class are state machines for query execution. Kernels do 
not do any RPC or deal with any data.
+ * This separation of decision-making from the "real world" allows the 
decision-making to live in one,
+ * easy-to-follow place.
+ *
+ * @see org.apache.druid.msq.kernel.worker.WorkerStageKernel state machine on 
the worker side
+ */
+public class ControllerQueryKernel
+{
+  private final QueryDefinition queryDef;
+
+  // Stage ID -> tracker for that stage. An extension of the state of this 
kernel.
+  private final Map<StageId, ControllerStageTracker> stageTracker = new 
HashMap<>();
+
+  private Map<StageId, Set<StageId>> inflowMap = null; // Will be initialized 
to an unmodifiable map
+  private Map<StageId, Set<StageId>> outflowMap = null; // Will be initialized 
to an unmodifiable map
+
+  // Maintains a running map of (stageId -> pending inflow stages) which need 
to be completed to provision the stage
+  // corresponding to the stageId. After initializing,if the value of the 
entry becomes an empty set, it is removed from
+  // the map, and the removed entry is added to readyToRunStages
+  private Map<StageId, Set<StageId>> pendingInflowMap = null;
+
+  // Maintains a running count of (stageId -> outflow stages pending on its 
results). After initializing, if
+  // the value of the entry becomes an empty set, it is removed from the map 
and the removed entry is added to
+  // effectivelyFinishedStages
+  private Map<StageId, Set<StageId>> pendingOutflowMap = null;
+
+  // Tracks those stages which can be initialized safely.
+  private final Set<StageId> readyToRunStages = new HashSet<>();
+
+  // Tracks the stageIds which can be finished. Once returned by 
getEffectivelyFinishedStageKernels(), it gets cleared
+  // and not tracked anymore in this Set
+  private final Set<StageId> effectivelyFinishedStages = new HashSet<>(); // 
Modifiable map
+
+  public ControllerQueryKernel(final QueryDefinition queryDef)
+  {
+    this.queryDef = queryDef;
+    initializeStageDAGMaps();
+    initializeLeafStages();
+  }
+
+  /**
+   * Creates new kernels, if they can be initialized, and returns the tracked 
kernels which are in NEW phase
+   */
+  public List<StageId> createAndGetNewStageIds(
+      final InputSpecSlicerFactory slicerFactory,
+      final WorkerAssignmentStrategy assignmentStrategy
+  )
+  {
+    final Int2IntMap stageWorkerCountMap = new Int2IntAVLTreeMap();
+    final Int2ObjectMap<ReadablePartitions> stagePartitionsMap = new 
Int2ObjectAVLTreeMap<>();
+
+    for (final ControllerStageTracker stageKernel : stageTracker.values()) {
+      final int stageNumber = 
stageKernel.getStageDefinition().getStageNumber();
+      stageWorkerCountMap.put(stageNumber, 
stageKernel.getWorkerInputs().workerCount());
+
+      if (stageKernel.hasResultPartitions()) {
+        stagePartitionsMap.put(stageNumber, stageKernel.getResultPartitions());
+      }
+    }
+
+    createNewKernels(stageWorkerCountMap, 
slicerFactory.makeSlicer(stagePartitionsMap), assignmentStrategy);
+    return stageTracker.values()
+                       .stream()
+                       .filter(controllerStageTracker -> 
controllerStageTracker.getPhase() == ControllerStagePhase.NEW)
+                       .map(stageKernel -> 
stageKernel.getStageDefinition().getId())
+                       .collect(Collectors.toList());
+  }
+
+  /**
+   * @return Stage kernels in this query kernel which can be safely cleaned up 
and marked as FINISHED. This returns the
+   * kernel corresponding to a particular stage only once, to reduce the 
number of stages to iterate through.
+   * It is expectant of the caller to eventually mark the stage as {@link 
ControllerStagePhase#FINISHED} after fetching
+   * the stage kernel
+   */
+  public List<StageId> getEffectivelyFinishedStageIds()
+  {
+    return ImmutableList.copyOf(effectivelyFinishedStages);
+  }
+
+  /**
+   * Returns all the kernels which have been initialized and are being tracked
+   */
+  public List<StageId> getActiveStages()
+  {
+    return ImmutableList.copyOf(stageTracker.keySet());
+  }
+
+  /**
+   * Returns a stage's kernel corresponding to a particular stage number
+   */
+  public StageId getStageId(final int stageNumber)
+  {
+    return new StageId(queryDef.getQueryId(), stageNumber);
+  }
+
+  /**
+   * Returns true if query needs no further processing, i.e. if final stage is 
successful or if any of the stages have
+   * been failed
+   */
+  public boolean isDone()
+  {
+    return 
Optional.ofNullable(stageTracker.get(queryDef.getFinalStageDefinition().getId()))
+                   .filter(tracker -> 
ControllerStagePhase.isSuccessfulTerminalPhase(tracker.getPhase()))
+                   .isPresent()
+           || stageTracker.values().stream().anyMatch(tracker -> 
tracker.getPhase() == ControllerStagePhase.FAILED);
+  }
+
+  /**
+   * Marks all the successful terminal stages to completion, so that the 
queryKernel shows a canonical view of
+   * phases of the stages once it completes
+   */
+  public void markSuccessfulTerminalStagesAsFinished()
+  {
+    for (final StageId stageId : getActiveStages()) {
+      ControllerStagePhase phase = getStagePhase(stageId);
+      // While the following conditional is redundant currently, it makes 
logical sense to mark all the "successful
+      // terminal phases" to FINISHED at the end, hence the if clause. Inside 
the conditional, depending on the
+      // terminal phase it resides in, we synthetically mark it to completion 
(and therefore we need to check which
+      // stage it is precisely in)
+      if (ControllerStagePhase.isSuccessfulTerminalPhase(phase)) {
+        if (phase == ControllerStagePhase.RESULTS_READY) {
+          finishStage(stageId, false);
+        }
+      }
+    }
+  }
+
+  /**
+   * Returns true if all the stages comprising the query definition have been 
sucessful in producing their results
+   */
+  public boolean isSuccess()
+  {
+    return stageTracker.size() == queryDef.getStageDefinitions().size()
+           && stageTracker.values()
+                          .stream()
+                          .allMatch(tracker -> 
ControllerStagePhase.isSuccessfulTerminalPhase(tracker.getPhase()));
+  }
+
+  /**
+   * Creates a list of work orders, corresponding to each worker, for a 
particular stageNumber
+   */
+  public Int2ObjectMap<WorkOrder> createWorkOrders(
+      final int stageNumber,
+      @Nullable final Int2ObjectMap<Object> extraInfos
+  )
+  {
+    final Int2ObjectMap<WorkOrder> retVal = new Int2ObjectAVLTreeMap<>();
+    final ControllerStageTracker stageKernel = 
getStageKernelOrThrow(getStageId(stageNumber));
+
+    final WorkerInputs workerInputs = stageKernel.getWorkerInputs();
+    for (int workerNumber : workerInputs.workers()) {
+      final Object extraInfo = extraInfos != null ? 
extraInfos.get(workerNumber) : null;
+
+      //noinspection unchecked
+      final ExtraInfoHolder<?> extraInfoHolder =
+          
stageKernel.getStageDefinition().getProcessorFactory().makeExtraInfoHolder(extraInfo);
+
+      final WorkOrder workOrder = new WorkOrder(
+          queryDef,
+          stageNumber,
+          workerNumber,
+          workerInputs.inputsForWorker(workerNumber),
+          extraInfoHolder
+      );
+
+      QueryValidator.validateWorkOrder(workOrder);
+      retVal.put(workerNumber, workOrder);
+    }
+
+    return retVal;
+  }
+
+  private void createNewKernels(
+      final Int2IntMap stageWorkerCountMap,
+      final InputSpecSlicer slicer,
+      final WorkerAssignmentStrategy assignmentStrategy
+  )
+  {
+    for (final StageId nextStage : readyToRunStages) {
+      // Create a tracker.
+      final StageDefinition stageDef = queryDef.getStageDefinition(nextStage);
+      final ControllerStageTracker stageKernel = ControllerStageTracker.create(
+          stageDef,
+          stageWorkerCountMap,
+          slicer,
+          assignmentStrategy
+      );
+      stageTracker.put(nextStage, stageKernel);
+    }
+
+    readyToRunStages.clear();
+  }
+
+  /**
+   * Populates the inflowMap, outflowMap and pending inflow/outflow maps 
corresponding to the query definition
+   */
+  private void initializeStageDAGMaps()
+  {
+    initializeStageOutflowMap(this.queryDef);
+    initializeStageInflowMap(this.queryDef);
+  }
+
+  /**
+   * Initializes this.outflowMap with a mapping of stage -> stages that depend 
on that stage.
+   */
+  private void initializeStageOutflowMap(final QueryDefinition queryDefinition)
+  {
+    Preconditions.checkArgument(this.outflowMap == null, "outflow map must 
only be built once");
+    final Map<StageId, Set<StageId>> retVal = new HashMap<>();
+    this.pendingOutflowMap = new HashMap<>();
+    for (final StageDefinition stageDef : 
queryDefinition.getStageDefinitions()) {
+      final StageId stageId = stageDef.getId();
+      retVal.computeIfAbsent(stageId, ignored -> new HashSet<>());
+      this.pendingOutflowMap.computeIfAbsent(stageId, ignored -> new 
HashSet<>());
+      for (final int inputStageNumber : 
queryDefinition.getStageDefinition(stageId).getInputStageNumbers()) {
+        final StageId inputStageId = new StageId(queryDef.getQueryId(), 
inputStageNumber);
+        retVal.computeIfAbsent(inputStageId, ignored -> new 
HashSet<>()).add(stageId);
+        this.pendingOutflowMap.computeIfAbsent(inputStageId, ignored -> new 
HashSet<>()).add(stageId);
+      }
+    }
+    this.outflowMap = Collections.unmodifiableMap(retVal);
+  }
+
+  /**
+   * Initializes this.inflowMap with a mapping of stage -> stages that flow 
*into* that stage.
+   */
+  private void initializeStageInflowMap(final QueryDefinition queryDefinition)
+  {
+    final Map<StageId, Set<StageId>> retVal = new HashMap<>();
+    this.pendingInflowMap = new HashMap<>();
+    for (final StageDefinition stageDef : 
queryDefinition.getStageDefinitions()) {
+      final StageId stageId = stageDef.getId();
+      retVal.computeIfAbsent(stageId, ignored -> new HashSet<>());
+      this.pendingInflowMap.computeIfAbsent(stageId, ignored -> new 
HashSet<>());
+      for (final int inputStageNumber : 
queryDefinition.getStageDefinition(stageId).getInputStageNumbers()) {
+        final StageId inputStageId = new StageId(queryDef.getQueryId(), 
inputStageNumber);
+        retVal.computeIfAbsent(stageId, ignored -> new 
HashSet<>()).add(inputStageId);
+        this.pendingInflowMap.computeIfAbsent(stageId, ignored -> new 
HashSet<>()).add(inputStageId);
+      }
+    }
+    this.inflowMap = Collections.unmodifiableMap(retVal);
+  }
+
+  /**
+   * Adds stageIds for those stages which donot require any input from any 
other stages

Review Comment:
   Nit "donot" -> "do not"



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MSQTasks.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.guice.MultiStageQuery;
+import org.apache.druid.msq.indexing.error.CanceledFault;
+import org.apache.druid.msq.indexing.error.DurableStorageConfigurationFault;
+import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
+import org.apache.druid.msq.indexing.error.MSQErrorReport;
+import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.MSQFault;
+import org.apache.druid.msq.indexing.error.UnknownFault;
+import org.apache.druid.msq.indexing.error.WorkerFailedFault;
+import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
+import org.apache.druid.msq.statistics.KeyCollectorFactory;
+import org.apache.druid.msq.statistics.KeyCollectorSnapshot;
+import org.apache.druid.msq.statistics.KeyCollectorSnapshotDeserializerModule;
+import org.apache.druid.msq.statistics.KeyCollectors;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.storage.StorageConnector;
+
+import javax.annotation.Nullable;
+import java.util.UUID;
+
+public class MSQTasks
+{
+  /**
+   * Message used by {@link #makeErrorReport} when no other message is known.
+   */
+  static final String GENERIC_QUERY_FAILED_MESSAGE = "Query failed";
+
+  private static final String TASK_ID_PREFIX = "query-";
+
+  /**
+   * Returns a controller task ID given a
+   */
+  public static String controllerTaskId(@Nullable final String queryId)
+  {
+    return TASK_ID_PREFIX + (queryId == null ? UUID.randomUUID().toString() : 
queryId);
+  }
+
+  /**
+   * Returns a controller task ID given a

Review Comment:
   Nit: "given a"... what?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.querykit.DataSegmentProvider;
+import org.apache.druid.msq.querykit.LazyResourceHolder;
+import org.apache.druid.msq.rpc.CoordinatorServiceClient;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.loading.SegmentCacheManager;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Production implementation of {@link DataSegmentProvider} using Coordinator 
APIs.
+ */
+public class TaskDataSegmentProvider implements DataSegmentProvider
+{
+  private final CoordinatorServiceClient coordinatorClient;
+  private final SegmentCacheManager segmentCacheManager;
+  private final IndexIO indexIO;
+
+  public TaskDataSegmentProvider(
+      CoordinatorServiceClient coordinatorClient,
+      SegmentCacheManager segmentCacheManager,
+      IndexIO indexIO
+  )
+  {
+    this.coordinatorClient = coordinatorClient;
+    this.segmentCacheManager = segmentCacheManager;
+    this.indexIO = indexIO;
+  }
+
+  @Override
+  public LazyResourceHolder<Segment> fetchSegment(
+      final SegmentId segmentId,
+      final ChannelCounters channelCounters
+  )
+  {
+    try {
+      // Use LazyResourceHolder so Coordinator call and segment downloads 
happen in processing threads,
+      // rather than the main thread.
+      return new LazyResourceHolder<>(
+          () -> {
+            try {
+              final DataSegment dataSegment = FutureUtils.getUnchecked(
+                  coordinatorClient.fetchUsedSegment(
+                      segmentId.getDataSource(),
+                      segmentId.toString()
+                  ),
+                  true
+              );
+
+              final Closer closer = Closer.create();
+              final File segmentDir = 
segmentCacheManager.getSegmentFiles(dataSegment);
+              closer.register(() -> FileUtils.deleteDirectory(segmentDir));
+
+              final QueryableIndex index = indexIO.loadIndex(segmentDir);
+              final int numRows = index.getNumRows();
+              final long size = dataSegment.getSize();
+              closer.register(() -> channelCounters.addFile(numRows, size));
+              closer.register(index);
+              return Pair.of(new QueryableIndexSegment(index, 
dataSegment.getId()), closer);
+            }
+            catch (IOException | SegmentLoadingException e) {
+              throw new RuntimeException(e);

Review Comment:
   Should we include in the error information about the segment that failed to 
download? And, differentiate between the coordinator failure and the download 
failure?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -0,0 +1,1230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.SettableFuture;
+import it.unimi.dsi.fastutil.bytes.ByteArrays;
+import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
+import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
+import org.apache.druid.frame.channel.ReadableFileFrameChannel;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.ReadableNilFrameChannel;
+import org.apache.druid.frame.file.FrameFile;
+import org.apache.druid.frame.file.FrameFileWriter;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.ClusterByPartitions;
+import org.apache.druid.frame.processor.BlockingQueueOutputChannelFactory;
+import org.apache.druid.frame.processor.Bouncer;
+import org.apache.druid.frame.processor.FileOutputChannelFactory;
+import org.apache.druid.frame.processor.FrameChannelMuxer;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessorExecutor;
+import org.apache.druid.frame.processor.OutputChannel;
+import org.apache.druid.frame.processor.OutputChannelFactory;
+import org.apache.druid.frame.processor.OutputChannels;
+import org.apache.druid.frame.processor.SuperSorter;
+import org.apache.druid.frame.processor.SuperSorterProgressTracker;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.counters.CounterNames;
+import org.apache.druid.msq.counters.CounterSnapshotsTree;
+import org.apache.druid.msq.counters.CounterTracker;
+import org.apache.druid.msq.indexing.CountingOutputChannelFactory;
+import org.apache.druid.msq.indexing.InputChannelFactory;
+import org.apache.druid.msq.indexing.InputChannelsImpl;
+import org.apache.druid.msq.indexing.KeyStatisticsCollectionProcessor;
+import org.apache.druid.msq.indexing.MSQWorkerTask;
+import org.apache.druid.msq.indexing.error.CanceledFault;
+import org.apache.druid.msq.indexing.error.MSQErrorReport;
+import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.MSQWarningReportLimiterPublisher;
+import org.apache.druid.msq.indexing.error.MSQWarningReportPublisher;
+import org.apache.druid.msq.indexing.error.MSQWarningReportSimplePublisher;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSliceReader;
+import org.apache.druid.msq.input.InputSlices;
+import org.apache.druid.msq.input.MapInputSliceReader;
+import org.apache.druid.msq.input.NilInputSlice;
+import org.apache.druid.msq.input.NilInputSliceReader;
+import org.apache.druid.msq.input.external.ExternalInputSlice;
+import org.apache.druid.msq.input.external.ExternalInputSliceReader;
+import org.apache.druid.msq.input.stage.InputChannels;
+import org.apache.druid.msq.input.stage.ReadablePartition;
+import org.apache.druid.msq.input.stage.StageInputSlice;
+import org.apache.druid.msq.input.stage.StageInputSliceReader;
+import org.apache.druid.msq.input.table.SegmentsInputSlice;
+import org.apache.druid.msq.input.table.SegmentsInputSliceReader;
+import org.apache.druid.msq.kernel.FrameContext;
+import org.apache.druid.msq.kernel.FrameProcessorFactory;
+import org.apache.druid.msq.kernel.ProcessorsAndChannels;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageId;
+import org.apache.druid.msq.kernel.StagePartition;
+import org.apache.druid.msq.kernel.WorkOrder;
+import org.apache.druid.msq.kernel.worker.WorkerStageKernel;
+import org.apache.druid.msq.kernel.worker.WorkerStagePhase;
+import org.apache.druid.msq.querykit.DataSegmentProvider;
+import org.apache.druid.msq.shuffle.DurableStorageInputChannelFactory;
+import org.apache.druid.msq.shuffle.DurableStorageOutputChannelFactory;
+import org.apache.druid.msq.shuffle.WorkerInputChannelFactory;
+import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+import org.apache.druid.msq.util.DecoratedExecutorService;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.PrioritizedCallable;
+import org.apache.druid.query.PrioritizedRunnable;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.server.DruidNode;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.channels.Channels;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * Interface for a worker of a multi-stage query.
+ */
+public class WorkerImpl implements Worker
+{
+  private static final Logger log = new Logger(WorkerImpl.class);
+
+  private final MSQWorkerTask task;
+  private final WorkerContext context;
+
+  private final BlockingQueue<Consumer<KernelHolder>> kernelManipulationQueue 
= new LinkedBlockingDeque<>();
+  private final ConcurrentHashMap<StageId, ConcurrentHashMap<Integer, 
ReadableFrameChannel>> stageOutputs = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<StageId, CounterTracker> stageCounters = new 
ConcurrentHashMap<>();
+  private final boolean durableStageStorageEnabled;
+
+  private volatile DruidNode selfDruidNode;
+  private volatile ControllerClient controllerClient;
+  private volatile WorkerClient workerClient;
+  private volatile Bouncer processorBouncer;
+  private volatile boolean controllerAlive = true;

Review Comment:
   Since these are marked volatile, it means that we have to worry about 
concurrent reads and writes. These, however, seem fundamental. Is it true that, 
say, a worker starts running and these values are then concurrently set a some 
random time later? If so, should there be some state or locking to enforce 
ordering? (I.e. don't do "real work" until the fundamentals are provided.)
   
   Or, is it the case that there is an implied state machine: that these are 
set in one thread (say, via a work order or some such?), and only then does 
another thread consume them? If so, do they need to be volatile? What is the 
sync mechanism?
   
   Reading further, it appears these are set in `runTask` based on the context. 
Yet, the context is available in the constructor. Should at least the 
`selfDruidNode` be set in the constructor and be final?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -0,0 +1,1230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.SettableFuture;
+import it.unimi.dsi.fastutil.bytes.ByteArrays;
+import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
+import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
+import org.apache.druid.frame.channel.ReadableFileFrameChannel;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.ReadableNilFrameChannel;
+import org.apache.druid.frame.file.FrameFile;
+import org.apache.druid.frame.file.FrameFileWriter;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.ClusterByPartitions;
+import org.apache.druid.frame.processor.BlockingQueueOutputChannelFactory;
+import org.apache.druid.frame.processor.Bouncer;
+import org.apache.druid.frame.processor.FileOutputChannelFactory;
+import org.apache.druid.frame.processor.FrameChannelMuxer;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessorExecutor;
+import org.apache.druid.frame.processor.OutputChannel;
+import org.apache.druid.frame.processor.OutputChannelFactory;
+import org.apache.druid.frame.processor.OutputChannels;
+import org.apache.druid.frame.processor.SuperSorter;
+import org.apache.druid.frame.processor.SuperSorterProgressTracker;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.counters.CounterNames;
+import org.apache.druid.msq.counters.CounterSnapshotsTree;
+import org.apache.druid.msq.counters.CounterTracker;
+import org.apache.druid.msq.indexing.CountingOutputChannelFactory;
+import org.apache.druid.msq.indexing.InputChannelFactory;
+import org.apache.druid.msq.indexing.InputChannelsImpl;
+import org.apache.druid.msq.indexing.KeyStatisticsCollectionProcessor;
+import org.apache.druid.msq.indexing.MSQWorkerTask;
+import org.apache.druid.msq.indexing.error.CanceledFault;
+import org.apache.druid.msq.indexing.error.MSQErrorReport;
+import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.MSQWarningReportLimiterPublisher;
+import org.apache.druid.msq.indexing.error.MSQWarningReportPublisher;
+import org.apache.druid.msq.indexing.error.MSQWarningReportSimplePublisher;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSliceReader;
+import org.apache.druid.msq.input.InputSlices;
+import org.apache.druid.msq.input.MapInputSliceReader;
+import org.apache.druid.msq.input.NilInputSlice;
+import org.apache.druid.msq.input.NilInputSliceReader;
+import org.apache.druid.msq.input.external.ExternalInputSlice;
+import org.apache.druid.msq.input.external.ExternalInputSliceReader;
+import org.apache.druid.msq.input.stage.InputChannels;
+import org.apache.druid.msq.input.stage.ReadablePartition;
+import org.apache.druid.msq.input.stage.StageInputSlice;
+import org.apache.druid.msq.input.stage.StageInputSliceReader;
+import org.apache.druid.msq.input.table.SegmentsInputSlice;
+import org.apache.druid.msq.input.table.SegmentsInputSliceReader;
+import org.apache.druid.msq.kernel.FrameContext;
+import org.apache.druid.msq.kernel.FrameProcessorFactory;
+import org.apache.druid.msq.kernel.ProcessorsAndChannels;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageId;
+import org.apache.druid.msq.kernel.StagePartition;
+import org.apache.druid.msq.kernel.WorkOrder;
+import org.apache.druid.msq.kernel.worker.WorkerStageKernel;
+import org.apache.druid.msq.kernel.worker.WorkerStagePhase;
+import org.apache.druid.msq.querykit.DataSegmentProvider;
+import org.apache.druid.msq.shuffle.DurableStorageInputChannelFactory;
+import org.apache.druid.msq.shuffle.DurableStorageOutputChannelFactory;
+import org.apache.druid.msq.shuffle.WorkerInputChannelFactory;
+import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
+import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
+import org.apache.druid.msq.util.DecoratedExecutorService;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.PrioritizedCallable;
+import org.apache.druid.query.PrioritizedRunnable;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.server.DruidNode;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.channels.Channels;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * Interface for a worker of a multi-stage query.
+ */
+public class WorkerImpl implements Worker
+{
+  private static final Logger log = new Logger(WorkerImpl.class);
+
+  private final MSQWorkerTask task;
+  private final WorkerContext context;
+
+  private final BlockingQueue<Consumer<KernelHolder>> kernelManipulationQueue 
= new LinkedBlockingDeque<>();
+  private final ConcurrentHashMap<StageId, ConcurrentHashMap<Integer, 
ReadableFrameChannel>> stageOutputs = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<StageId, CounterTracker> stageCounters = new 
ConcurrentHashMap<>();
+  private final boolean durableStageStorageEnabled;
+
+  private volatile DruidNode selfDruidNode;
+  private volatile ControllerClient controllerClient;
+  private volatile WorkerClient workerClient;
+  private volatile Bouncer processorBouncer;
+  private volatile boolean controllerAlive = true;
+
+  public WorkerImpl(MSQWorkerTask task, WorkerContext context)
+  {
+    this.task = task;
+    this.context = context;
+    this.durableStageStorageEnabled = 
MultiStageQueryContext.isDurableStorageEnabled(task.getContext());
+  }
+
+  @Override
+  public String id()
+  {
+    return task.getId();
+  }
+
+  @Override
+  public MSQWorkerTask task()
+  {
+    return task;
+  }
+
+  @Override
+  public TaskStatus run() throws Exception
+  {
+    try (final Closer closer = Closer.create()) {
+      Optional<MSQErrorReport> maybeErrorReport;
+
+      try {
+        maybeErrorReport = runTask(closer);
+      }
+      catch (Throwable e) {
+        maybeErrorReport = Optional.of(
+            MSQErrorReport.fromException(id(), 
MSQTasks.getHostFromSelfNode(selfDruidNode), null, e)
+        );
+      }
+
+      if (maybeErrorReport.isPresent()) {
+        final MSQErrorReport errorReport = maybeErrorReport.get();
+        final String errorLogMessage = 
MSQTasks.errorReportToLogMessage(errorReport);
+        log.warn(errorLogMessage);
+
+        closer.register(() -> {
+          if (controllerAlive && controllerClient != null && selfDruidNode != 
null) {
+            controllerClient.postWorkerError(id(), errorReport);
+          }
+        });
+
+        return TaskStatus.failure(id(), 
errorReport.getFault().getCodeWithMessage());
+      } else {
+        return TaskStatus.success(id());
+      }
+    }
+  }
+
+  /**
+   * Runs worker logic. Returns an empty Optional on success. On failure, 
returns an error report for errors that
+   * happened in other threads; throws exceptions for errors that happened in 
the main worker loop.
+   */
+  public Optional<MSQErrorReport> runTask(final Closer closer) throws Exception
+  {
+    this.selfDruidNode = context.selfNode();
+    this.controllerClient = 
context.makeControllerClient(task.getControllerTaskId());
+    closer.register(controllerClient::close);
+    context.registerWorker(this, closer); // Uses controllerClient, so must be 
called after that is initialized
+    this.workerClient = new 
ExceptionWrappingWorkerClient(context.makeWorkerClient());
+    closer.register(workerClient::close);
+    this.processorBouncer = context.processorBouncer();
+
+    final KernelHolder kernelHolder = new KernelHolder();
+    final String cancellationId = id();
+
+    final FrameProcessorExecutor workerExec = new 
FrameProcessorExecutor(makeProcessingPool());
+
+    // Delete all the stage outputs
+    closer.register(() -> {
+      for (final StageId stageId : stageOutputs.keySet()) {
+        cleanStageOutput(stageId);
+      }
+    });
+
+    // Close stage output processors and running futures (if present)
+    closer.register(() -> {
+      try {
+        workerExec.cancel(cancellationId);
+      }
+      catch (InterruptedException e) {
+        // Strange that cancellation would itself be interrupted. Throw an 
exception, since this is unexpected.
+        throw new RuntimeException(e);
+      }
+    });
+
+    final MSQWarningReportPublisher msqWarningReportPublisher = new 
MSQWarningReportLimiterPublisher(
+        new MSQWarningReportSimplePublisher(
+            id(),
+            controllerClient,
+            id(),
+            MSQTasks.getHostFromSelfNode(selfDruidNode)
+        )
+    );
+
+    closer.register(msqWarningReportPublisher);
+
+    final Map<StageId, SettableFuture<ClusterByPartitions>> 
partitionBoundariesFutureMap = new HashMap<>();
+
+    final Map<StageId, FrameContext> stageFrameContexts = new HashMap<>();
+
+    while (!kernelHolder.isDone()) {
+      boolean didSomething = false;
+
+      for (final WorkerStageKernel kernel : 
kernelHolder.getStageKernelMap().values()) {
+        final StageDefinition stageDefinition = kernel.getStageDefinition();
+
+        if (kernel.getPhase() == WorkerStagePhase.NEW) {
+          log.debug("New work order: %s", 
context.jsonMapper().writeValueAsString(kernel.getWorkOrder()));
+
+          // Create separate inputChannelFactory per stage, because the list 
of tasks can grow between stages, and
+          // so we need to avoid the memoization in baseInputChannelFactory.
+          final InputChannelFactory inputChannelFactory = 
makeBaseInputChannelFactory(closer);
+
+          // Compute memory parameters for all stages, even ones that haven't 
been assigned yet, so we can fail-fast
+          // if some won't work. (We expect that all stages will get assigned 
to the same pool of workers.)
+          for (final StageDefinition stageDef : 
kernel.getWorkOrder().getQueryDefinition().getStageDefinitions()) {
+            stageFrameContexts.computeIfAbsent(
+                stageDef.getId(),
+                stageId -> context.frameContext(
+                    kernel.getWorkOrder().getQueryDefinition(),
+                    stageId.getStageNumber()
+                )
+            );
+          }
+
+          // Start working on this stage immediately.
+          kernel.startReading();
+          final SettableFuture<ClusterByPartitions> partitionBoundariesFuture =
+              startWorkOrder(
+                  kernel,
+                  inputChannelFactory,
+                  stageCounters.computeIfAbsent(stageDefinition.getId(), 
ignored -> new CounterTracker()),
+                  workerExec,
+                  cancellationId,
+                  context.threadCount(),
+                  stageFrameContexts.get(stageDefinition.getId()),
+                  msqWarningReportPublisher
+              );
+
+          if (partitionBoundariesFuture != null) {
+            if (partitionBoundariesFutureMap.put(stageDefinition.getId(), 
partitionBoundariesFuture) != null) {
+              throw new ISE("Work order collision for stage [%s]", 
stageDefinition.getId());
+            }
+          }
+
+          didSomething = true;
+          logKernelStatus(kernelHolder.getStageKernelMap().values());
+        }
+
+        if (kernel.getPhase() == WorkerStagePhase.READING_INPUT && 
kernel.hasResultKeyStatisticsSnapshot()) {
+          if (controllerAlive) {
+            controllerClient.postKeyStatistics(
+                stageDefinition.getId(),
+                kernel.getWorkOrder().getWorkerNumber(),
+                kernel.getResultKeyStatisticsSnapshot()
+            );
+          }
+          kernel.startPreshuffleWaitingForResultPartitionBoundaries();
+
+          didSomething = true;
+          logKernelStatus(kernelHolder.getStageKernelMap().values());
+        }
+
+        logKernelStatus(kernelHolder.getStageKernelMap().values());
+        if (kernel.getPhase() == 
WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES
+            && kernel.hasResultPartitionBoundaries()) {
+          
partitionBoundariesFutureMap.get(stageDefinition.getId()).set(kernel.getResultPartitionBoundaries());
+          kernel.startPreshuffleWritingOutput();
+
+          didSomething = true;
+          logKernelStatus(kernelHolder.getStageKernelMap().values());
+        }
+
+        if (kernel.getPhase() == WorkerStagePhase.RESULTS_READY
+            && 
kernel.addPostedResultsComplete(Pair.of(stageDefinition.getId(), 
kernel.getWorkOrder().getWorkerNumber()))) {
+          if (controllerAlive) {
+            controllerClient.postResultsComplete(
+                stageDefinition.getId(),
+                kernel.getWorkOrder().getWorkerNumber(),
+                kernel.getResultObject()
+            );
+          }
+        }
+
+        if (kernel.getPhase() == WorkerStagePhase.FAILED) {
+          // Better than throwing an exception, because we can include the 
stage number.
+          return Optional.of(
+              MSQErrorReport.fromException(
+                  id(),
+                  MSQTasks.getHostFromSelfNode(selfDruidNode),
+                  stageDefinition.getId().getStageNumber(),
+                  kernel.getException()
+              )
+          );
+        }
+      }
+
+      if (!didSomething && !kernelHolder.isDone()) {
+        Consumer<KernelHolder> nextCommand;
+
+        do {
+          postCountersToController();
+        } while ((nextCommand = kernelManipulationQueue.poll(5, 
TimeUnit.SECONDS)) == null);
+
+        nextCommand.accept(kernelHolder);
+        logKernelStatus(kernelHolder.getStageKernelMap().values());
+      }
+    }
+
+    // Empty means success.
+    return Optional.empty();
+  }
+
+  @Override
+  public void stopGracefully()
+  {
+    kernelManipulationQueue.add(
+        kernel -> {
+          // stopGracefully() is called when the containing process is 
terminated, or when the task is canceled.
+          throw new MSQException(CanceledFault.INSTANCE);
+        }
+    );
+  }
+
+  @Override
+  public void controllerFailed()
+  {
+    controllerAlive = false;
+    stopGracefully();
+  }
+
+  @Override
+  public InputStream readChannel(
+      final String queryId,
+      final int stageNumber,
+      final int partitionNumber,
+      final long offset
+  ) throws IOException
+  {
+    final StageId stageId = new StageId(queryId, stageNumber);
+    final StagePartition stagePartition = new StagePartition(stageId, 
partitionNumber);
+    final ConcurrentHashMap<Integer, ReadableFrameChannel> 
partitionOutputsForStage = stageOutputs.get(stageId);
+
+    if (partitionOutputsForStage == null) {
+      return null;
+    }
+    final ReadableFrameChannel channel = 
partitionOutputsForStage.get(partitionNumber);
+
+    if (channel == null) {
+      return null;
+    }
+
+    if (channel instanceof ReadableNilFrameChannel) {
+      // Build an empty frame file.
+      final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      FrameFileWriter.open(Channels.newChannel(baos), null).close();
+
+      final ByteArrayInputStream in = new 
ByteArrayInputStream(baos.toByteArray());
+
+      //noinspection ResultOfMethodCallIgnored: OK to ignore since "skip" 
always works for ByteArrayInputStream.
+      in.skip(offset);
+
+      return in;
+    } else if (channel instanceof ReadableFileFrameChannel) {
+      // Close frameFile once we've returned an input stream: no need to 
retain a reference to the mmap after that,
+      // since we aren't using it.
+      try (final FrameFile frameFile = ((ReadableFileFrameChannel) 
channel).newFrameFileReference()) {
+        final RandomAccessFile randomAccessFile = new 
RandomAccessFile(frameFile.file(), "r");
+
+        if (offset >= randomAccessFile.length()) {
+          randomAccessFile.close();
+          return new ByteArrayInputStream(ByteArrays.EMPTY_ARRAY);
+        } else {
+          randomAccessFile.seek(offset);
+          return Channels.newInputStream(randomAccessFile.getChannel());
+        }
+      }
+    } else {
+      String errorMsg = StringUtils.format(
+          "Returned server error to client because channel for [%s] is not nil 
or file-based (class = %s)",
+          stagePartition,
+          channel.getClass().getName()
+      );
+      log.error(StringUtils.encodeForFormat(errorMsg));
+
+      throw new IOException(errorMsg);
+    }
+  }
+
+  @Override
+  public void postWorkOrder(final WorkOrder workOrder)
+  {
+    if (task.getWorkerNumber() != workOrder.getWorkerNumber()) {
+      throw new ISE("Worker number mismatch: expected [%d]", 
task.getWorkerNumber());
+    }
+
+    kernelManipulationQueue.add(
+        kernelHolder ->
+            kernelHolder.getStageKernelMap().computeIfAbsent(
+                workOrder.getStageDefinition().getId(),
+                ignored -> WorkerStageKernel.create(workOrder)
+            )
+    );
+  }
+
+  @Override
+  public boolean postResultPartitionBoundaries(
+      final ClusterByPartitions stagePartitionBoundaries,
+      final String queryId,
+      final int stageNumber
+  )
+  {
+    final StageId stageId = new StageId(queryId, stageNumber);
+
+    kernelManipulationQueue.add(
+        kernelHolder -> {
+          final WorkerStageKernel stageKernel = 
kernelHolder.getStageKernelMap().get(stageId);
+
+          // Ignore the update if we don't have a kernel for this stage.
+          if (stageKernel != null) {
+            stageKernel.setResultPartitionBoundaries(stagePartitionBoundaries);
+          } else {
+            log.warn("Ignored result partition boundaries call for unknown 
stage [%s]", stageId);
+          }
+        }
+    );
+    return true;
+  }
+
+  @Override
+  public void postCleanupStage(final StageId stageId)
+  {
+    log.info("Cleanup order for stage: [%s] received", stageId);
+    kernelManipulationQueue.add(
+        holder -> {
+          cleanStageOutput(stageId);
+          // Mark the stage as FINISHED
+          holder.getStageKernelMap().get(stageId).setStageFinished();
+        }
+    );
+  }
+
+  @Override
+  public void postFinish()
+  {
+    kernelManipulationQueue.add(KernelHolder::setDone);
+  }
+
+  @Override
+  public CounterSnapshotsTree getCounters()
+  {
+    final CounterSnapshotsTree retVal = new CounterSnapshotsTree();
+
+    for (final Map.Entry<StageId, CounterTracker> entry : 
stageCounters.entrySet()) {
+      retVal.put(entry.getKey().getStageNumber(), task().getWorkerNumber(), 
entry.getValue().snapshot());
+    }
+
+    return retVal;
+  }
+
+  private InputChannelFactory makeBaseInputChannelFactory(final Closer closer)
+  {
+    final Supplier<List<String>> workerTaskList = Suppliers.memoize(
+        () -> {
+          try {
+            return controllerClient.getTaskList();
+          }
+          catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+    )::get;
+
+    if (durableStageStorageEnabled) {
+      return DurableStorageInputChannelFactory.createStandardImplementation(
+          task.getControllerTaskId(),
+          workerTaskList,
+          MSQTasks.makeStorageConnector(context.injector()),
+          closer
+      );
+    } else {
+      return new WorkerOrLocalInputChannelFactory(workerTaskList);
+    }
+  }
+
+  private OutputChannelFactory makeStageOutputChannelFactory(final 
FrameContext frameContext, final int stageNumber)
+  {
+    // Use the standard frame size, since we assume this size when computing 
how much is needed to merge output
+    // files from different workers.
+    final int frameSize = 
frameContext.memoryParameters().getStandardFrameSize();
+
+    if (durableStageStorageEnabled) {
+      return DurableStorageOutputChannelFactory.createStandardImplementation(
+          task.getControllerTaskId(),
+          id(),
+          stageNumber,
+          frameSize,
+          MSQTasks.makeStorageConnector(context.injector())
+      );
+    } else {
+      final File fileChannelDirectory =
+          new File(context.tempDir(), StringUtils.format("output_stage_%06d", 
stageNumber));
+
+      return new FileOutputChannelFactory(fileChannelDirectory, frameSize);
+    }
+  }
+
+  private ListeningExecutorService makeProcessingPool()

Review Comment:
   For us newbies, maybe add a note to explain what the processing pool is. Or, 
point to where this is explained.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to