LakshSingla commented on code in PR #13353:
URL: https://github.com/apache/druid/pull/13353#discussion_r1023465226
##########
docs/multi-stage-query/reference.md:
##########
@@ -265,7 +268,9 @@ The following table describes error codes you may encounter
in the `multiStageQu
| `TooManyColumns` | Exceeded the number of columns for a stage. See the
[Limits](#limits) table for the specific limit. | `numColumns`: The number of
columns requested.<br /><br />`maxColumns`: The limit on columns which was
exceeded. |
| `TooManyWarnings` | Exceeded the allowed number of warnings of a particular
type. | `rootErrorCode`: The error code corresponding to the exception that
exceeded the required limit. <br /><br />`maxWarnings`: Maximum number of
warnings that are allowed for the corresponding `rootErrorCode`. |
| `TooManyWorkers` | Exceeded the supported number of workers running
simultaneously. See the [Limits](#limits) table for the specific limit. |
`workers`: The number of simultaneously running workers that exceeded a hard or
soft limit. This may be larger than the number of workers in any one stage if
multiple stages are running simultaneously. <br /><br />`maxWorkers`: The hard
or soft limit on workers that was exceeded. |
+| `TotalRelaunchLimitExceededFault` | Total relaunch count across all workers
exceeded max relaunch limit. See the [Limits](#limits) table for the specific
limit. | `maxRelaunchCount`: Max number of relaunches across all the workers
defined in the [Limits](#limits) section. <br /><br /> `currentRelaunchCount`:
current relaunch counter for the job. <br /><br /> `taskId`: Latest task id
which failed <br /> <br /> `rootErrorMessage`: Error message of the latest
failed task.|
Review Comment:
Can you please clarify if `currentRelaunchCount` is the relaunch count for
the current worker or is the cumulative relaunch count across all the workers?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -551,6 +563,15 @@ private QueryDefinition initializeQueryDefAndState(final
Closer closer)
id(),
task.getDataSource(),
context,
+ (failedTask, fault) -> {
+ addToKernelManipulationQueue((kernel) -> {
+ if (isDurableStorageEnabled) {
Review Comment:
I think across this PR and https://github.com/apache/druid/pull/13368, we
might need to unify what enabling durable storage, enabling fault tolerance,
and enabling durable storage for intermediate steps mean since that might cause
confusion in the code as well as to the end user configuring the properties.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/WorkerRelaunchedTooManyTimes.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.error;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Objects;
+
+@JsonTypeName(WorkerRelaunchedTooManyTimes.CODE)
+public class WorkerRelaunchedTooManyTimes extends BaseMSQFault
+{
+ static final String CODE = "WorkerRelaunchedTooManyTimes";
+
+
+ private final int maxPerWorkerRelaunchCount;
+
+ private final String taskId;
+
+ private final int workerNumber;
+
+
+ private final String rootErrorMessage;
+
+ @JsonCreator
+ public WorkerRelaunchedTooManyTimes(
+ @JsonProperty("maxPerWorkerRelaunchCount") int maxPerWorkerRelaunchCount,
+ @JsonProperty("taskId") String taskId,
+ @JsonProperty("workerNumber") int workerNumber,
+ @JsonProperty("rootErrorMessage") String rootErrorMessage
+ )
+ {
+ super(
+ CODE,
+ "Worker[%d] exceeded max relaunch count of %d for task[%s]. Latest
failure reason: %s.",
+ workerNumber,
+ maxPerWorkerRelaunchCount,
+ taskId,
+ rootErrorMessage
+ );
+ this.maxPerWorkerRelaunchCount = maxPerWorkerRelaunchCount;
+ this.taskId = taskId;
+ this.workerNumber = workerNumber;
+ this.rootErrorMessage = rootErrorMessage;
+ }
+
+ @JsonProperty
+ public int getMaxPerWorkerRelaunchCount()
+ {
+ return maxPerWorkerRelaunchCount;
+ }
+
+ @JsonProperty
+ public int getWorkerNumber()
+ {
+ return workerNumber;
+ }
+
+ @JsonProperty
+ public String getTaskId()
+ {
+ return taskId;
+ }
+
+ @JsonProperty
+ public String getRootErrorMessage()
+ {
+ return rootErrorMessage;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ WorkerRelaunchedTooManyTimes that = (WorkerRelaunchedTooManyTimes) o;
+ return maxPerWorkerRelaunchCount == that.maxPerWorkerRelaunchCount
Review Comment:
nit: Indentation seems a bit off
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -546,4 +614,65 @@ private static Map<StageId, Set<StageId>>
computeStageOutflowMap(final QueryDefi
return retVal;
}
+
+ /**
+ * Checks the {@link MSQFault#getErrorCode()} is eligible for retry.
+ * <br/>
+ * If yes, transitions the stage to{@link ControllerStagePhase#RETRYING} and
returns all the {@link WorkOrder}
+ * <br/>
+ * else throw {@link MSQException}
+ *
+ * @param workerNumber
+ * @param msqFault
+ * @return List of {@link WorkOrder} that needs to be retried.
+ */
+ public List<WorkOrder> getWorkInCaseWorkerElgibileForRetryElseThrow(int
workerNumber, MSQFault msqFault)
+ {
+
+ final String errorCode;
+ if (msqFault instanceof WorkerFailedFault) {
+ errorCode = MSQFaultUtils.getErrorCodeFromMessage((((WorkerFailedFault)
msqFault).getErrorMsg()));
+ } else {
+ errorCode = msqFault.getErrorCode();
+ }
+
+ if (retriableErrorCodes.contains(errorCode)) {
+ return getWorkInCaseWorkerElgibileForRetryElseThrow(workerNumber);
+
+ } else {
+ throw new MSQException(msqFault);
+ }
+ }
+
+ /**
+ * Gets all the stages currently being tracked and filters out all
effectively finished stages.
+ * <br/>
+ * From the remaining stages, checks if (stage,worker) needs to be retried.
+ * <br/>
+ * If yes adds the workOrder for that stage to the return list and
transitions the stage kernel to {@link ControllerStagePhase#RETRYING}
+ *
+ * @param worker
+ * @return List of {@link WorkOrder} that needs to be retried.
+ */
+ private List<WorkOrder> getWorkInCaseWorkerElgibileForRetryElseThrow(int
worker)
+ {
+ List<StageId> trackedSet = new ArrayList<>(getActiveStages());
+ // no need to retry effectively finished stages
+ List<StageId> getEffictivelyFinishedStages =
getEffectivelyFinishedStageIds();
Review Comment:
nit: spelling
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStagePhase.java:
##########
@@ -35,7 +35,7 @@
@Override
public boolean canTransitionFrom(final ControllerStagePhase priorPhase)
{
- return false;
+ return true;
Review Comment:
`NEW` should be the initial state and therefore initialized when we are
creating the `ControllerStagePhase`. We shouldn't allow any transition to `NEW`
I think.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -546,4 +614,65 @@ private static Map<StageId, Set<StageId>>
computeStageOutflowMap(final QueryDefi
return retVal;
}
+
+ /**
+ * Checks the {@link MSQFault#getErrorCode()} is eligible for retry.
+ * <br/>
+ * If yes, transitions the stage to{@link ControllerStagePhase#RETRYING} and
returns all the {@link WorkOrder}
+ * <br/>
+ * else throw {@link MSQException}
+ *
+ * @param workerNumber
+ * @param msqFault
+ * @return List of {@link WorkOrder} that needs to be retried.
+ */
+ public List<WorkOrder> getWorkInCaseWorkerElgibileForRetryElseThrow(int
workerNumber, MSQFault msqFault)
+ {
+
+ final String errorCode;
+ if (msqFault instanceof WorkerFailedFault) {
+ errorCode = MSQFaultUtils.getErrorCodeFromMessage((((WorkerFailedFault)
msqFault).getErrorMsg()));
+ } else {
+ errorCode = msqFault.getErrorCode();
+ }
+
+ if (retriableErrorCodes.contains(errorCode)) {
+ return getWorkInCaseWorkerElgibileForRetryElseThrow(workerNumber);
+
+ } else {
+ throw new MSQException(msqFault);
+ }
+ }
+
+ /**
+ * Gets all the stages currently being tracked and filters out all
effectively finished stages.
+ * <br/>
+ * From the remaining stages, checks if (stage,worker) needs to be retried.
+ * <br/>
+ * If yes adds the workOrder for that stage to the return list and
transitions the stage kernel to {@link ControllerStagePhase#RETRYING}
+ *
+ * @param worker
+ * @return List of {@link WorkOrder} that needs to be retried.
+ */
+ private List<WorkOrder> getWorkInCaseWorkerElgibileForRetryElseThrow(int
worker)
Review Comment:
This method is not thrown anywhere. We should rename it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]