gianm commented on code in PR #13353:
URL: https://github.com/apache/druid/pull/13353#discussion_r1027845939
##########
docs/multi-stage-query/known-issues.md:
##########
@@ -29,7 +29,7 @@ sidebar_label: Known issues
## Multi-stage query task runtime
-- Fault tolerance is not implemented. If any task fails, the entire query
fails.
+- Fault tolerance is partially implemented. Workers get relaunched when they
are killed unexpectedly. The controller does not get relaunched if its killed
unexpectedly.
Review Comment:
"it is killed unexpectedly" (spelling)
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -564,6 +585,34 @@ private QueryDefinition initializeQueryDefAndState(final
Closer closer)
return queryDef;
}
+ /**
+ * Adds the workorders for worker to {@link
ControllerImpl#workOrdersToRetry} if the {@link ControllerQueryKernel}
determines that there
+ * are work orders which needs reprocessing.
+ */
+ private void addToRetryQueue(ControllerQueryKernel kernel, int worker,
MSQFault fault)
+ {
+ List<WorkOrder> retriableWorkOrders =
kernel.getWorkInCaseWorkerElgibileForRetryElseThrow(worker, fault);
Review Comment:
Eligible (spelling)
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -44,18 +52,20 @@
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
Review Comment:
ControllerQueryKernel is supposed to be a simple class that doesn't need to
be thread-safe, because it's only accessed from the main logic thread of the
controller. It's suspicious that ConcurrentHashMap is introduced here. What's
it needed for? Can we achieve the same thing while keeping the
simple/non-threadsafe nature of this class?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -986,19 +1036,79 @@ private Int2ObjectMap<List<SegmentIdWithShardSpec>>
makeSegmentGeneratorWorkerFa
return retVal;
}
- private void contactWorkersForStage(final TaskContactFn contactFn, final
IntSet workers)
+ /**
+ * A blocking function used to contact multiple workers. Checks if all the
workers are running before contacting them.
+ *
+ * @param queryKernel
+ * @param contactFn
+ * @param workers set of workers to contact
+ * @param successCallBack on successfull api call, custom callback
+ * @param retryOnFailure if set to true, adds this worker to retry queue.
If false, cancel all the futures and propergate the exception to the caller.
+ */
+ private void contactWorkersForStage(
+ final ControllerQueryKernel queryKernel,
+ final TaskContactFn contactFn,
+ final IntSet workers,
+ final TaskContactSuccesss successCallBack,
+ final boolean retryOnFailure
+ )
{
final List<String> taskIds = getTaskIds();
- final List<ListenableFuture<Void>> taskFutures = new
ArrayList<>(workers.size());
+ final List<ListenableFuture<Boolean>> taskFutures = new
ArrayList<>(workers.size());
+
+ try {
+ workerTaskLauncher.waitUntilWorkersReady(workers);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ Set<Pair<Integer, String>> failedCalls = ConcurrentHashMap.newKeySet();
for (int workerNumber : workers) {
final String taskId = taskIds.get(workerNumber);
- taskFutures.add(contactFn.contactTask(netClient, taskId, workerNumber));
+ SettableFuture<Boolean> settableFuture = SettableFuture.create();
+ ListenableFuture<Void> apiFuture = contactFn.contactTask(netClient,
taskId, workerNumber);
+ Futures.addCallback(apiFuture, new FutureCallback<Void>()
+ {
+ @Override
+ public void onSuccess(@Nullable Void result)
+ {
+ successCallBack.onSuccess(taskId, workerNumber);
+ settableFuture.set(true);
+ }
+
+ @Override
+ public void onFailure(Throwable t)
+ {
+ if (retryOnFailure) {
+ log.info(
+ t,
+ "Detected failure while contacting task[%s]. Iniitiating
relaunch of worker[%d] if applicable",
Review Comment:
Initiating (spelling)
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -392,6 +543,36 @@ private void generateResultPartitionsAndBoundaries()
}
}
+ /**
+ * True if all partitions stats are present, else false.
+ */
+ private boolean allPartitionStatisticsPresent()
+ {
+ return workerToPhase.values()
+ .stream()
+ .filter(stagePhase ->
stagePhase.equals(WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES)
+ ||
stagePhase.equals(WorkerStagePhase.PRESHUFFLE_WRITING_OUTPUT)
+ ||
stagePhase.equals(WorkerStagePhase.RESULTS_READY))
+ .count()
+ == workerCount;
+ }
+
+ /**
+ * True if work orders needs to be sent else false.
Review Comment:
This javadoc doesn't seem quite right either: it doesn't seem true to me
that if all workers are in `RESULTS_READY` then work orders need to be sent.
Please expand the javadoc to explain what this really means.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -392,6 +543,36 @@ private void generateResultPartitionsAndBoundaries()
}
}
+ /**
+ * True if all partitions stats are present, else false.
+ */
+ private boolean allPartitionStatisticsPresent()
Review Comment:
What do you think about making the `filter` from this method, and from
`workOrdersNeedToBeSent`, into methods on `ControllerStagePhase` itself? Like,
`stagePhase.workOrderNeedsToBeSent()`. It would keep all the phase-related code
together in `ControllerStagePhase.java`, which may improve readability.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -38,27 +41,35 @@
import org.apache.druid.msq.input.stage.StageInputSlice;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.kernel.worker.WorkerStagePhase;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import javax.annotation.Nullable;
import java.util.List;
+import java.util.stream.IntStream;
/**
* Controller-side state machine for each stage. Used by {@link
ControllerQueryKernel} to form the overall state
* machine for an entire query.
- *
+ * <p>
* Package-private: stage trackers are an internal implementation detail of
{@link ControllerQueryKernel}, not meant
* for separate use.
*/
class ControllerStageTracker
{
+ private static final Logger log = new Logger(ControllerStageTracker.class);
private final StageDefinition stageDef;
+
private final int workerCount;
private final WorkerInputs workerInputs;
+
+ // worker-> workerStagePhase
+ private final Int2ObjectMap<WorkerStagePhase> workerToPhase = new
Int2ObjectOpenHashMap<>();
Review Comment:
This is a big change. Previously `WorkerStagePhase` was localized to the
worker. Now it's on the controller too. Is the controller state kept in sync
with the worker state? If so how does that work? Please explain this stuff in
the comment.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -258,18 +355,41 @@ ControllerStagePhase addResultKeyStatisticsForWorker(
throw new IAE("Invalid workerNumber [%s]", workerNumber);
}
+ if (phase != ControllerStagePhase.READING_INPUT && phase !=
ControllerStagePhase.RETRYING) {
+ throw new ISE("Cannot add result key statistics from stage [%s]", phase);
+ }
+
+ WorkerStagePhase currentPhase = workerToPhase.get(workerNumber);
+
+ if (currentPhase == null) {
+ throw new ISE("Worker[%d] not found for stage[%s]", workerNumber,
stageDef.getStageNumber());
+ }
+
try {
- if (workersWithResultKeyStatistics.add(workerNumber)) {
- resultKeyStatisticsCollector.addAll(snapshot);
+ if
(WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES.canTransitionFrom(currentPhase))
{
+ workerToPhase.put(workerNumber,
WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES);
- if (workersWithResultKeyStatistics.size() == workerCount) {
- generateResultPartitionsAndBoundaries();
+ // if stats already recieved for worker, donot update the sketch.
Review Comment:
received (spelling), do not (spelling)
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -261,6 +269,9 @@ public class ControllerImpl implements Controller
// Time at which the query started.
// For live reports. Written by the main controller thread, read by HTTP
threads.
+
+ // WorkerNumber -> WorkOrders
Review Comment:
Please expand on this comment. It should answer the following things:
- What is this map used for? What updates it and what reads it? Why does it
need to be thread-safe?
- Is the `Set<WorkOrder>` a concurrent set? Does it need to be synchronized
in some way when you use it?
##########
docs/multi-stage-query/reference.md:
##########
@@ -234,6 +234,9 @@ The following table lists query limits:
| Number of output columns for any one stage. | 2,000 | `TooManyColumns` |
| Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent
soft limit may be lower. | `TooManyWorkers` |
| Maximum memory occupied by broadcasted tables. | 30% of each [processor
memory bundle](concepts.md#memory-usage). | `BroadcastTablesTooLarge` |
+| Maximum relaunches per worker. Initial run is not a relaunch. The worker
will be spawned 1 + workerRelaunchLimit times before erroring out. | 2 |
`WorkerRelaunchedTooManyTimes` |
+| Maximum relaunches across all workers. | 30 |
`TotalRelaunchLimitExceededFault` |
Review Comment:
It'd be better if the two faults had similar names, like
`TooManyAttemptsForWorker` and `TooManyAttemptsForJob`. Or something like that.
Also, don't include `Fault` in the name of faults.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -107,6 +117,19 @@
*/
private final Set<StageId> effectivelyFinishedStages = new HashSet<>();
+
+ /**
+ * Store the work orders for the stage so that we can retrieve that in case
of worker retry
+ */
+ private final Map<StageId, Int2ObjectMap<WorkOrder>> stageWorkOrders;
+
+ /**
+ * {@link MSQFault#getErrorCode()} which are retried.
+ */
+ private final Set<String> retriableErrorCodes =
ImmutableSet.of(CanceledFault.CODE, UnknownFault.CODE,
Review Comment:
`static`?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.error;
+
+public class MSQFaultUtils
+{
+
+ public static final String ERROR_CODE_DELIMITER = ": ";
+
+ /**
+ * Generate string message with error code delimited by {@link
MSQFaultUtils#ERROR_CODE_DELIMITER}
+ */
+ public static String generateMessageWithErrorCode(MSQFault msqFault)
+ {
+ final String message = msqFault.getErrorMessage();
+
+ if (message != null && !message.isEmpty()) {
+ return msqFault.getErrorCode() + ERROR_CODE_DELIMITER + message;
+ } else {
+ return msqFault.getErrorCode();
+ }
+ }
+
+ /**
+ * Gets the error code from the message. If the messay is empty or null,
{@link UnknownFault#CODE} is returned. This method
Review Comment:
It looks like the purpose of this method is to extract the original error
code from a `WorkerFailedFault`. This is the what structured error details are
for. Please consider removing this method, and instead adding a `workerError`
(type MSQFault) or `workerErrorCode` (type String) field to `WorkerFailedFault`.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStagePhase.java:
##########
@@ -85,6 +85,15 @@ public boolean canTransitionFrom(final ControllerStagePhase
priorPhase)
{
return true;
}
+ },
+
+ // Stage currently under retry. priorPhase did not publish its final results
yet.
Review Comment:
What does "under retry" mean? What is the expected next state after this
one? Please elaborate in the comment.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -603,6 +652,7 @@ public void updateStatus(int stageNumber, int workerNumber,
Object keyStatistics
@Override
public void workerError(MSQErrorReport errorReport)
{
+ // move inside kernel
Review Comment:
Is this a todo that you mean to do prior to committing?
##########
docs/multi-stage-query/reference.md:
##########
@@ -234,6 +234,9 @@ The following table lists query limits:
| Number of output columns for any one stage. | 2,000 | `TooManyColumns` |
| Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent
soft limit may be lower. | `TooManyWorkers` |
| Maximum memory occupied by broadcasted tables. | 30% of each [processor
memory bundle](concepts.md#memory-usage). | `BroadcastTablesTooLarge` |
+| Maximum relaunches per worker. Initial run is not a relaunch. The worker
will be spawned 1 + workerRelaunchLimit times before erroring out. | 2 |
`WorkerRelaunchedTooManyTimes` |
+| Maximum relaunches across all workers. | 30 |
`TotalRelaunchLimitExceededFault` |
Review Comment:
Seems low, especially since a single machine can be running many workers
that would all fail at the same time.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -107,6 +117,19 @@
*/
private final Set<StageId> effectivelyFinishedStages = new HashSet<>();
+
+ /**
+ * Store the work orders for the stage so that we can retrieve that in case
of worker retry
Review Comment:
Comment should include what the int keys of the inner map are. I guess it's
worker number, but that should be explicit in the comment.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -352,22 +401,25 @@ public Object getResultObjectForStage(final StageId
stageId)
/**
* Checks if the stage can be started, delegates call to {@link
ControllerStageTracker#start()} for internal phase
- * transition and registers the transition in this queryKernel
+ * transition and registers the transition in this queryKernel. Work orders
need to created via {@link ControllerQueryKernel#createWorkOrders(int,
Int2ObjectMap)} before calling this method.
Review Comment:
"need to be created" (grammar). Also this line is long.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -392,6 +543,36 @@ private void generateResultPartitionsAndBoundaries()
}
}
+ /**
+ * True if all partitions stats are present, else false.
Review Comment:
This javadoc doesn't seem quite right: it's going to possibly return `true`
even if `!stageDef.mustGatherResultKeyStatistics()`, but in that case of course
there are no partition statistics. This behavior is fine, but the javadoc
should be clearer. Please expand the javadoc to explain what this really means.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -484,6 +551,7 @@ public void transitionStageKernel(StageId stageId,
ControllerStagePhase newPhase
}
}
+ // might need to change this
Review Comment:
Why?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -340,20 +490,21 @@ void fail()
/**
* Sets {@link #resultPartitions} (always) and {@link
#resultPartitionBoundaries}.
- *
+ * <p>
* If {@link StageDefinition#mustGatherResultKeyStatistics()} is true, this
method cannot be called until after
* statistics have been provided to {@link #addResultKeyStatisticsForWorker}
for all workers.
*/
private void generateResultPartitionsAndBoundaries()
{
if (resultPartitions != null) {
- throw new ISE("Result partitions have already been generated");
+ log.debug("Partition boundaries already generated for stage %d",
stageDef.getStageNumber());
Review Comment:
Please include a comment about why it makes sense to return quietly if
`resultPartitions` is already set.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -546,4 +614,65 @@ private static Map<StageId, Set<StageId>>
computeStageOutflowMap(final QueryDefi
return retVal;
}
+
+ /**
+ * Checks the {@link MSQFault#getErrorCode()} is eligible for retry.
+ * <br/>
+ * If yes, transitions the stage to{@link ControllerStagePhase#RETRYING} and
returns all the {@link WorkOrder}
+ * <br/>
+ * else throw {@link MSQException}
+ *
+ * @param workerNumber
+ * @param msqFault
+ * @return List of {@link WorkOrder} that needs to be retried.
+ */
+ public List<WorkOrder> getWorkInCaseWorkerElgibileForRetryElseThrow(int
workerNumber, MSQFault msqFault)
+ {
+
+ final String errorCode;
+ if (msqFault instanceof WorkerFailedFault) {
+ errorCode = MSQFaultUtils.getErrorCodeFromMessage((((WorkerFailedFault)
msqFault).getErrorMsg()));
+ } else {
+ errorCode = msqFault.getErrorCode();
+ }
+
+ if (retriableErrorCodes.contains(errorCode)) {
+ return getWorkInCaseWorkerElgibileForRetryElseThrow(workerNumber);
+
+ } else {
+ throw new MSQException(msqFault);
+ }
+ }
+
+ /**
+ * Gets all the stages currently being tracked and filters out all
effectively finished stages.
+ * <br/>
+ * From the remaining stages, checks if (stage,worker) needs to be retried.
+ * <br/>
+ * If yes adds the workOrder for that stage to the return list and
transitions the stage kernel to {@link ControllerStagePhase#RETRYING}
+ *
+ * @param worker
+ * @return List of {@link WorkOrder} that needs to be retried.
+ */
+ private List<WorkOrder> getWorkInCaseWorkerElgibileForRetryElseThrow(int
worker)
+ {
+ List<StageId> trackedSet = new ArrayList<>(getActiveStages());
+ // no need to retry effectively finished stages
+ List<StageId> getEffictivelyFinishedStages =
getEffectivelyFinishedStageIds();
+ trackedSet.removeAll(getEffictivelyFinishedStages);
+
+ List<WorkOrder> workOrders = new ArrayList<>();
+
+ for (StageId stageId : trackedSet) {
+ ControllerStageTracker controllerStageTracker =
getStageKernelOrThrow(stageId);
+ if
(ControllerStagePhase.RETRYING.canTransitionFrom(controllerStageTracker.getPhase())
+ && controllerStageTracker.retryIfNeeded(worker)) {
+ workOrders.add(getWorkOrder(worker, stageId));
+ // should be a no-op. Calling for code patterns.
Review Comment:
What does "calling for code patterns" mean?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -564,6 +585,34 @@ private QueryDefinition initializeQueryDefAndState(final
Closer closer)
return queryDef;
}
+ /**
+ * Adds the workorders for worker to {@link
ControllerImpl#workOrdersToRetry} if the {@link ControllerQueryKernel}
determines that there
+ * are work orders which needs reprocessing.
+ */
+ private void addToRetryQueue(ControllerQueryKernel kernel, int worker,
MSQFault fault)
+ {
+ List<WorkOrder> retriableWorkOrders =
kernel.getWorkInCaseWorkerElgibileForRetryElseThrow(worker, fault);
+ if (retriableWorkOrders.size() != 0) {
+ log.info("Submitting worker[%s] for relaunch because of fault[%s]",
worker, fault);
Review Comment:
Include the worker and job relaunch counter in this log message?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -551,6 +563,15 @@ private QueryDefinition initializeQueryDefAndState(final
Closer closer)
id(),
task.getDataSource(),
context,
+ (failedTask, fault) -> {
+ addToKernelManipulationQueue((kernel) -> {
+ if (isDurableStorageEnabled) {
Review Comment:
Agreed — it should all be controllable by the same `faultTolerant` setting.
We don't want people to have to tweak a bunch of different settings in order to
get the full package of behavior.
IMO, it should default to false at first, while we're experimenting with it,
but we should change the default to true once we're feeling that it's stable.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -406,7 +459,6 @@ public void addResultKeyStatisticsForStageAndWorker(
// If the phase is POST_READING or FAILED, that implies the kernel has
transitioned. We need to account for that
switch (newPhase) {
- case POST_READING:
Review Comment:
Comment above is no longer accurate, it seems.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java:
##########
@@ -165,10 +188,88 @@ ClusterByPartitions getResultPartitionBoundaries()
}
}
+
+ /**
+ * Get workers which need to be sent partition boundaries
+ *
+ * @return
+ */
+ IntSet getWorkersToSendParitionBoundaries()
+ {
+ if (!getStageDefinition().doesShuffle()) {
+ throw new ISE("Result partition information is not relevant to this
stage because it does not shuffle");
+ }
+ IntAVLTreeSet workers = new IntAVLTreeSet();
+ for (Integer worker : workerToPhase.keySet()) {
+ if
(WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES.equals(workerToPhase.get(worker)))
{
+ workers.add(worker);
+ }
+ }
+ return workers;
+ }
+
+ /**
+ * Indicates that the work order for worker has been sent. Transistions the
state to {@link WorkerStagePhase#READING_INPUT}
+ * if no more work orders need to be sent.
+ *
+ * @param worker
+ */
+ void workOrderSentForWorker(int worker)
+ {
+
+ workerToPhase.compute(worker, (wk, state) -> {
+ if (state == null) {
+ throw new ISE("Worker[%d] not found for stage[%s]", wk,
stageDef.getStageNumber());
+ }
+ if (!WorkerStagePhase.READING_INPUT.canTransitionFrom(state)) {
+ throw new ISE(
+ "Worker[%d] cannot transistion from state[%s] to state[%s] while
sending work order",
+ worker,
+ state,
+ WorkerStagePhase.READING_INPUT
+ );
+ }
+ return WorkerStagePhase.READING_INPUT;
+ });
+ if (phase != ControllerStagePhase.READING_INPUT) {
+ if (workOrdersNeedToBeSent()) {
Review Comment:
Something seems backwards here? The check is `workOrdersNeedToBeSent()`, but
the comment says "if no more work orders need to be sent…". Please clarify.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStagePhase.java:
##########
@@ -35,7 +35,7 @@
@Override
public boolean canTransitionFrom(final ControllerStagePhase priorPhase)
{
- return false;
+ return true;
Review Comment:
Seems strange to me too; under which cases can a stage transition back to
NEW?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]