gianm commented on code in PR #13353:
URL: https://github.com/apache/druid/pull/13353#discussion_r1027845939


##########
docs/multi-stage-query/known-issues.md:
##########
@@ -29,7 +29,7 @@ sidebar_label: Known issues
 
 ## Multi-stage query task runtime
 
-- Fault tolerance is not implemented. If any task fails, the entire query 
fails.
+- Fault tolerance is partially implemented. Workers get relaunched when they 
are killed unexpectedly. The controller does not get relaunched if its killed 
unexpectedly.

Review Comment:
   "it is killed unexpectedly" (spelling)



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -564,6 +585,34 @@ private QueryDefinition initializeQueryDefAndState(final 
Closer closer)
     return queryDef;
   }
 
+  /**
+   * Adds the workorders for worker to {@link 
ControllerImpl#workOrdersToRetry} if the {@link ControllerQueryKernel} 
determines that there
+   * are work orders which needs reprocessing.
+   */
+  private void addToRetryQueue(ControllerQueryKernel kernel, int worker, 
MSQFault fault)
+  {
+    List<WorkOrder> retriableWorkOrders = 
kernel.getWorkInCaseWorkerElgibileForRetryElseThrow(worker, fault);

Review Comment:
   Eligible (spelling)



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -44,18 +52,20 @@
 import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;

Review Comment:
   ControllerQueryKernel is supposed to be a simple class that doesn't need to 
be thread-safe, because it's only accessed from the main logic thread of the 
controller. It's suspicious that ConcurrentHashMap is introduced here. What's 
it needed for? Can we achieve the same thing while keeping the 
simple/non-threadsafe nature of this class?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -986,19 +1036,79 @@ private Int2ObjectMap<List<SegmentIdWithShardSpec>> 
makeSegmentGeneratorWorkerFa
     return retVal;
   }
 
-  private void contactWorkersForStage(final TaskContactFn contactFn, final 
IntSet workers)
+  /**
+   * A blocking function used to contact multiple workers. Checks if all the 
workers are running before contacting them.
+   *
+   * @param queryKernel
+   * @param contactFn
+   * @param workers         set of workers to contact
+   * @param successCallBack on successfull api call, custom callback
+   * @param retryOnFailure  if set to true, adds this worker to retry queue. 
If false, cancel all the futures and propergate the exception to the caller.
+   */
+  private void contactWorkersForStage(
+      final ControllerQueryKernel queryKernel,
+      final TaskContactFn contactFn,
+      final IntSet workers,
+      final TaskContactSuccesss successCallBack,
+      final boolean retryOnFailure
+  )
   {
     final List<String> taskIds = getTaskIds();
-    final List<ListenableFuture<Void>> taskFutures = new 
ArrayList<>(workers.size());
+    final List<ListenableFuture<Boolean>> taskFutures = new 
ArrayList<>(workers.size());
+
+    try {
+      workerTaskLauncher.waitUntilWorkersReady(workers);
+    }
+    catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+    Set<Pair<Integer, String>> failedCalls = ConcurrentHashMap.newKeySet();
 
     for (int workerNumber : workers) {
       final String taskId = taskIds.get(workerNumber);
-      taskFutures.add(contactFn.contactTask(netClient, taskId, workerNumber));
+      SettableFuture<Boolean> settableFuture = SettableFuture.create();
+      ListenableFuture<Void> apiFuture = contactFn.contactTask(netClient, 
taskId, workerNumber);
+      Futures.addCallback(apiFuture, new FutureCallback<Void>()
+      {
+        @Override
+        public void onSuccess(@Nullable Void result)
+        {
+          successCallBack.onSuccess(taskId, workerNumber);
+          settableFuture.set(true);
+        }
+
+        @Override
+        public void onFailure(Throwable t)
+        {
+          if (retryOnFailure) {
+            log.info(
+                t,
+                "Detected failure while contacting task[%s]. Iniitiating 
relaunch of worker[%d] if applicable",

Review Comment:
   Initiating (spelling)



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -392,6 +543,36 @@ private void generateResultPartitionsAndBoundaries()
     }
   }
 
+  /**
+   * True if all partitions stats are present, else false.
+   */
+  private boolean allPartitionStatisticsPresent()
+  {
+    return workerToPhase.values()
+                        .stream()
+                        .filter(stagePhase -> 
stagePhase.equals(WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES)
+                                              || 
stagePhase.equals(WorkerStagePhase.PRESHUFFLE_WRITING_OUTPUT)
+                                              || 
stagePhase.equals(WorkerStagePhase.RESULTS_READY))
+                        .count()
+           == workerCount;
+  }
+
+  /**
+   * True if work orders needs to be sent else false.

Review Comment:
   This javadoc doesn't seem quite right either: it doesn't seem true to me 
that if all workers are in `RESULTS_READY` then work orders need to be sent. 
Please expand the javadoc to explain what this really means.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -392,6 +543,36 @@ private void generateResultPartitionsAndBoundaries()
     }
   }
 
+  /**
+   * True if all partitions stats are present, else false.
+   */
+  private boolean allPartitionStatisticsPresent()

Review Comment:
   What do you think about making the `filter` from this method, and from 
`workOrdersNeedToBeSent`, into methods on `ControllerStagePhase` itself? Like, 
`stagePhase.workOrderNeedsToBeSent()`. It would keep all the phase-related code 
together in `ControllerStagePhase.java`, which may improve readability. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -38,27 +41,35 @@
 import org.apache.druid.msq.input.stage.StageInputSlice;
 import org.apache.druid.msq.kernel.StageDefinition;
 import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.kernel.worker.WorkerStagePhase;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
 import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
 
 import javax.annotation.Nullable;
 import java.util.List;
+import java.util.stream.IntStream;
 
 /**
  * Controller-side state machine for each stage. Used by {@link 
ControllerQueryKernel} to form the overall state
  * machine for an entire query.
- *
+ * <p>
  * Package-private: stage trackers are an internal implementation detail of 
{@link ControllerQueryKernel}, not meant
  * for separate use.
  */
 class ControllerStageTracker
 {
+  private static final Logger log = new Logger(ControllerStageTracker.class);
   private final StageDefinition stageDef;
+
   private final int workerCount;
 
   private final WorkerInputs workerInputs;
+
+  // worker-> workerStagePhase
+  private final Int2ObjectMap<WorkerStagePhase> workerToPhase = new 
Int2ObjectOpenHashMap<>();

Review Comment:
   This is a big change. Previously `WorkerStagePhase` was localized to the 
worker. Now it's on the controller too. Is the controller state kept in sync 
with the worker state? If so how does that work? Please explain this stuff in 
the comment.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -258,18 +355,41 @@ ControllerStagePhase addResultKeyStatisticsForWorker(
       throw new IAE("Invalid workerNumber [%s]", workerNumber);
     }
 
+    if (phase != ControllerStagePhase.READING_INPUT && phase != 
ControllerStagePhase.RETRYING) {
+      throw new ISE("Cannot add result key statistics from stage [%s]", phase);
+    }
+
+    WorkerStagePhase currentPhase = workerToPhase.get(workerNumber);
+
+    if (currentPhase == null) {
+      throw new ISE("Worker[%d] not found for stage[%s]", workerNumber, 
stageDef.getStageNumber());
+    }
+
     try {
-      if (workersWithResultKeyStatistics.add(workerNumber)) {
-        resultKeyStatisticsCollector.addAll(snapshot);
+      if 
(WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES.canTransitionFrom(currentPhase))
 {
+        workerToPhase.put(workerNumber, 
WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES);
 
-        if (workersWithResultKeyStatistics.size() == workerCount) {
-          generateResultPartitionsAndBoundaries();
+        // if stats already recieved for worker, donot update the sketch.

Review Comment:
   received (spelling), do not (spelling)



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -261,6 +269,9 @@ public class ControllerImpl implements Controller
 
   // Time at which the query started.
   // For live reports. Written by the main controller thread, read by HTTP 
threads.
+
+  // WorkerNumber -> WorkOrders

Review Comment:
   Please expand on this comment. It should answer the following things:
   
   - What is this map used for? What updates it and what reads it? Why does it 
need to be thread-safe?
   - Is the `Set<WorkOrder>` a concurrent set? Does it need to be synchronized 
in some way when you use it?



##########
docs/multi-stage-query/reference.md:
##########
@@ -234,6 +234,9 @@ The following table lists query limits:
 | Number of output columns for any one stage. | 2,000 | `TooManyColumns` |
 | Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent 
soft limit may be lower. | `TooManyWorkers` |
 | Maximum memory occupied by broadcasted tables. | 30% of each [processor 
memory bundle](concepts.md#memory-usage). | `BroadcastTablesTooLarge` |
+| Maximum relaunches per worker. Initial run is not a relaunch. The worker 
will be spawned 1 + workerRelaunchLimit times before erroring out. | 2 | 
`WorkerRelaunchedTooManyTimes` |
+| Maximum relaunches across all workers. | 30 | 
`TotalRelaunchLimitExceededFault` |

Review Comment:
   It'd be better if the two faults had similar names, like 
`TooManyAttemptsForWorker` and `TooManyAttemptsForJob`. Or something like that. 
Also, don't include `Fault` in the name of faults.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -107,6 +117,19 @@
    */
   private final Set<StageId> effectivelyFinishedStages = new HashSet<>();
 
+
+  /**
+   * Store the work orders for the stage so that we can retrieve that in case 
of worker retry
+   */
+  private final Map<StageId, Int2ObjectMap<WorkOrder>> stageWorkOrders;
+
+  /**
+   * {@link MSQFault#getErrorCode()} which are retried.
+   */
+  private final Set<String> retriableErrorCodes = 
ImmutableSet.of(CanceledFault.CODE, UnknownFault.CODE,

Review Comment:
   `static`?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.error;
+
+public class MSQFaultUtils
+{
+
+  public static final String ERROR_CODE_DELIMITER = ": ";
+
+  /**
+   * Generate string message with error code delimited by {@link 
MSQFaultUtils#ERROR_CODE_DELIMITER}
+   */
+  public static String generateMessageWithErrorCode(MSQFault msqFault)
+  {
+    final String message = msqFault.getErrorMessage();
+
+    if (message != null && !message.isEmpty()) {
+      return msqFault.getErrorCode() + ERROR_CODE_DELIMITER + message;
+    } else {
+      return msqFault.getErrorCode();
+    }
+  }
+
+  /**
+   * Gets the error code from the message. If the messay is empty or null, 
{@link UnknownFault#CODE} is returned. This method

Review Comment:
   It looks like the purpose of this method is to extract the original error 
code from a `WorkerFailedFault`. This is the what structured error details are 
for. Please consider removing this method, and instead adding a `workerError` 
(type MSQFault) or `workerErrorCode` (type String) field to `WorkerFailedFault`.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStagePhase.java:
##########
@@ -85,6 +85,15 @@ public boolean canTransitionFrom(final ControllerStagePhase 
priorPhase)
     {
       return true;
     }
+  },
+
+  // Stage currently under retry. priorPhase did not publish its final results 
yet.

Review Comment:
   What does "under retry" mean? What is the expected next state after this 
one? Please elaborate in the comment.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -603,6 +652,7 @@ public void updateStatus(int stageNumber, int workerNumber, 
Object keyStatistics
   @Override
   public void workerError(MSQErrorReport errorReport)
   {
+    // move inside kernel

Review Comment:
   Is this a todo that you mean to do prior to committing?



##########
docs/multi-stage-query/reference.md:
##########
@@ -234,6 +234,9 @@ The following table lists query limits:
 | Number of output columns for any one stage. | 2,000 | `TooManyColumns` |
 | Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent 
soft limit may be lower. | `TooManyWorkers` |
 | Maximum memory occupied by broadcasted tables. | 30% of each [processor 
memory bundle](concepts.md#memory-usage). | `BroadcastTablesTooLarge` |
+| Maximum relaunches per worker. Initial run is not a relaunch. The worker 
will be spawned 1 + workerRelaunchLimit times before erroring out. | 2 | 
`WorkerRelaunchedTooManyTimes` |
+| Maximum relaunches across all workers. | 30 | 
`TotalRelaunchLimitExceededFault` |

Review Comment:
   Seems low, especially since a single machine can be running many workers 
that would all fail at the same time.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -107,6 +117,19 @@
    */
   private final Set<StageId> effectivelyFinishedStages = new HashSet<>();
 
+
+  /**
+   * Store the work orders for the stage so that we can retrieve that in case 
of worker retry

Review Comment:
   Comment should include what the int keys of the inner map are. I guess it's 
worker number, but that should be explicit in the comment.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -352,22 +401,25 @@ public Object getResultObjectForStage(final StageId 
stageId)
 
   /**
    * Checks if the stage can be started, delegates call to {@link 
ControllerStageTracker#start()} for internal phase
-   * transition and registers the transition in this queryKernel
+   * transition and registers the transition in this queryKernel. Work orders 
need to created via {@link ControllerQueryKernel#createWorkOrders(int, 
Int2ObjectMap)} before calling this method.

Review Comment:
   "need to be created" (grammar). Also this line is long.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -392,6 +543,36 @@ private void generateResultPartitionsAndBoundaries()
     }
   }
 
+  /**
+   * True if all partitions stats are present, else false.

Review Comment:
   This javadoc doesn't seem quite right: it's going to possibly return `true` 
even if `!stageDef.mustGatherResultKeyStatistics()`, but in that case of course 
there are no partition statistics. This behavior is fine, but the javadoc 
should be clearer. Please expand the javadoc to explain what this really means.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -484,6 +551,7 @@ public void transitionStageKernel(StageId stageId, 
ControllerStagePhase newPhase
       }
     }
 
+    // might need to change this

Review Comment:
   Why?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -340,20 +490,21 @@ void fail()
 
   /**
    * Sets {@link #resultPartitions} (always) and {@link 
#resultPartitionBoundaries}.
-   *
+   * <p>
    * If {@link StageDefinition#mustGatherResultKeyStatistics()} is true, this 
method cannot be called until after
    * statistics have been provided to {@link #addResultKeyStatisticsForWorker} 
for all workers.
    */
   private void generateResultPartitionsAndBoundaries()
   {
     if (resultPartitions != null) {
-      throw new ISE("Result partitions have already been generated");
+      log.debug("Partition boundaries already generated for stage %d", 
stageDef.getStageNumber());

Review Comment:
   Please include a comment about why it makes sense to return quietly if 
`resultPartitions` is already set.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -546,4 +614,65 @@ private static Map<StageId, Set<StageId>> 
computeStageOutflowMap(final QueryDefi
 
     return retVal;
   }
+
+  /**
+   * Checks the {@link MSQFault#getErrorCode()} is eligible for retry.
+   * <br/>
+   * If yes, transitions the stage to{@link ControllerStagePhase#RETRYING} and 
returns all the {@link WorkOrder}
+   * <br/>
+   * else throw {@link MSQException}
+   *
+   * @param workerNumber
+   * @param msqFault
+   * @return List of {@link WorkOrder} that needs to be retried.
+   */
+  public List<WorkOrder> getWorkInCaseWorkerElgibileForRetryElseThrow(int 
workerNumber, MSQFault msqFault)
+  {
+
+    final String errorCode;
+    if (msqFault instanceof WorkerFailedFault) {
+      errorCode = MSQFaultUtils.getErrorCodeFromMessage((((WorkerFailedFault) 
msqFault).getErrorMsg()));
+    } else {
+      errorCode = msqFault.getErrorCode();
+    }
+
+    if (retriableErrorCodes.contains(errorCode)) {
+      return getWorkInCaseWorkerElgibileForRetryElseThrow(workerNumber);
+
+    } else {
+      throw new MSQException(msqFault);
+    }
+  }
+
+  /**
+   * Gets all the stages currently being tracked and filters out all 
effectively finished stages.
+   * <br/>
+   * From the remaining stages, checks if (stage,worker) needs to be retried.
+   * <br/>
+   * If yes adds the workOrder for that stage to the return list and 
transitions the stage kernel to {@link ControllerStagePhase#RETRYING}
+   *
+   * @param worker
+   * @return List of {@link WorkOrder} that needs to be retried.
+   */
+  private List<WorkOrder> getWorkInCaseWorkerElgibileForRetryElseThrow(int 
worker)
+  {
+    List<StageId> trackedSet = new ArrayList<>(getActiveStages());
+    // no need to retry effectively finished stages
+    List<StageId> getEffictivelyFinishedStages = 
getEffectivelyFinishedStageIds();
+    trackedSet.removeAll(getEffictivelyFinishedStages);
+
+    List<WorkOrder> workOrders = new ArrayList<>();
+
+    for (StageId stageId : trackedSet) {
+      ControllerStageTracker controllerStageTracker = 
getStageKernelOrThrow(stageId);
+      if 
(ControllerStagePhase.RETRYING.canTransitionFrom(controllerStageTracker.getPhase())
+          && controllerStageTracker.retryIfNeeded(worker)) {
+        workOrders.add(getWorkOrder(worker, stageId));
+        // should be a no-op. Calling for code patterns.

Review Comment:
   What does "calling for code patterns" mean?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -564,6 +585,34 @@ private QueryDefinition initializeQueryDefAndState(final 
Closer closer)
     return queryDef;
   }
 
+  /**
+   * Adds the workorders for worker to {@link 
ControllerImpl#workOrdersToRetry} if the {@link ControllerQueryKernel} 
determines that there
+   * are work orders which needs reprocessing.
+   */
+  private void addToRetryQueue(ControllerQueryKernel kernel, int worker, 
MSQFault fault)
+  {
+    List<WorkOrder> retriableWorkOrders = 
kernel.getWorkInCaseWorkerElgibileForRetryElseThrow(worker, fault);
+    if (retriableWorkOrders.size() != 0) {
+      log.info("Submitting worker[%s] for relaunch because of fault[%s]", 
worker, fault);

Review Comment:
   Include the worker and job relaunch counter in this log message?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -551,6 +563,15 @@ private QueryDefinition initializeQueryDefAndState(final 
Closer closer)
         id(),
         task.getDataSource(),
         context,
+        (failedTask, fault) -> {
+          addToKernelManipulationQueue((kernel) -> {
+            if (isDurableStorageEnabled) {

Review Comment:
   Agreed — it should all be controllable by the same `faultTolerant` setting. 
We don't want people to have to tweak a bunch of different settings in order to 
get the full package of behavior.
   
   IMO, it should default to false at first, while we're experimenting with it, 
but we should change the default to true once we're feeling that it's stable.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -406,7 +459,6 @@ public void addResultKeyStatisticsForStageAndWorker(
 
     // If the phase is POST_READING or FAILED, that implies the kernel has 
transitioned. We need to account for that
     switch (newPhase) {
-      case POST_READING:

Review Comment:
   Comment above is no longer accurate, it seems.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -165,10 +188,88 @@ ClusterByPartitions getResultPartitionBoundaries()
     }
   }
 
+
+  /**
+   * Get workers which need to be sent partition boundaries
+   *
+   * @return
+   */
+  IntSet getWorkersToSendParitionBoundaries()
+  {
+    if (!getStageDefinition().doesShuffle()) {
+      throw new ISE("Result partition information is not relevant to this 
stage because it does not shuffle");
+    }
+    IntAVLTreeSet workers = new IntAVLTreeSet();
+    for (Integer worker : workerToPhase.keySet()) {
+      if 
(WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES.equals(workerToPhase.get(worker)))
 {
+        workers.add(worker);
+      }
+    }
+    return workers;
+  }
+
+  /**
+   * Indicates that the work order for worker has been sent. Transistions the 
state to {@link WorkerStagePhase#READING_INPUT}
+   * if no more work orders need to be sent.
+   *
+   * @param worker
+   */
+  void workOrderSentForWorker(int worker)
+  {
+
+    workerToPhase.compute(worker, (wk, state) -> {
+      if (state == null) {
+        throw new ISE("Worker[%d] not found for stage[%s]", wk, 
stageDef.getStageNumber());
+      }
+      if (!WorkerStagePhase.READING_INPUT.canTransitionFrom(state)) {
+        throw new ISE(
+            "Worker[%d] cannot transistion from state[%s] to state[%s] while 
sending work order",
+            worker,
+            state,
+            WorkerStagePhase.READING_INPUT
+        );
+      }
+      return WorkerStagePhase.READING_INPUT;
+    });
+    if (phase != ControllerStagePhase.READING_INPUT) {
+      if (workOrdersNeedToBeSent()) {

Review Comment:
   Something seems backwards here? The check is `workOrdersNeedToBeSent()`, but 
the comment says "if no more work orders need to be sent…". Please clarify.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStagePhase.java:
##########
@@ -35,7 +35,7 @@
     @Override
     public boolean canTransitionFrom(final ControllerStagePhase priorPhase)
     {
-      return false;
+      return true;

Review Comment:
   Seems strange to me too; under which cases can a stage transition back to 
NEW?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to