LakshSingla commented on code in PR #13353:
URL: https://github.com/apache/druid/pull/13353#discussion_r1023465226


##########
docs/multi-stage-query/reference.md:
##########
@@ -265,7 +268,9 @@ The following table describes error codes you may encounter 
in the `multiStageQu
 | `TooManyColumns` | Exceeded the number of columns for a stage. See the 
[Limits](#limits) table for the specific limit. | `numColumns`: The number of 
columns requested.<br /><br />`maxColumns`: The limit on columns which was 
exceeded. |
 | `TooManyWarnings` | Exceeded the allowed number of warnings of a particular 
type. | `rootErrorCode`: The error code corresponding to the exception that 
exceeded the required limit. <br /><br />`maxWarnings`: Maximum number of 
warnings that are allowed for the corresponding `rootErrorCode`. |
 | `TooManyWorkers` | Exceeded the supported number of workers running 
simultaneously. See the [Limits](#limits) table for the specific limit. | 
`workers`: The number of simultaneously running workers that exceeded a hard or 
soft limit. This may be larger than the number of workers in any one stage if 
multiple stages are running simultaneously. <br /><br />`maxWorkers`: The hard 
or soft limit on workers that was exceeded. |
+| `TotalRelaunchLimitExceededFault` | Total relaunch count across all workers 
exceeded max relaunch limit. See the [Limits](#limits) table for the specific 
limit. | `maxRelaunchCount`: Max number of relaunches across all the workers 
defined in the [Limits](#limits) section. <br /><br /> `currentRelaunchCount`: 
current relaunch counter for the job. <br /><br /> `taskId`: Latest task id 
which failed <br /> <br /> `rootErrorMessage`: Error message of the latest 
failed task.|

Review Comment:
   Can you please clarify if `currentRelaunchCount` is the relaunch count for 
the current worker or is the cumulative relaunch count across all the workers? 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -551,6 +563,15 @@ private QueryDefinition initializeQueryDefAndState(final 
Closer closer)
         id(),
         task.getDataSource(),
         context,
+        (failedTask, fault) -> {
+          addToKernelManipulationQueue((kernel) -> {
+            if (isDurableStorageEnabled) {

Review Comment:
   I think across this PR and https://github.com/apache/druid/pull/13368, we 
might need to unify what enabling durable storage, enabling fault tolerance, 
and enabling durable storage for intermediate steps mean since that might cause 
confusion in the code as well as to the end user configuring the properties.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/WorkerRelaunchedTooManyTimes.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.error;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Objects;
+
+@JsonTypeName(WorkerRelaunchedTooManyTimes.CODE)
+public class WorkerRelaunchedTooManyTimes extends BaseMSQFault
+{
+  static final String CODE = "WorkerRelaunchedTooManyTimes";
+
+
+  private final int maxPerWorkerRelaunchCount;
+
+  private final String taskId;
+
+  private final int workerNumber;
+
+
+  private final String rootErrorMessage;
+
+  @JsonCreator
+  public WorkerRelaunchedTooManyTimes(
+      @JsonProperty("maxPerWorkerRelaunchCount") int maxPerWorkerRelaunchCount,
+      @JsonProperty("taskId") String taskId,
+      @JsonProperty("workerNumber") int workerNumber,
+      @JsonProperty("rootErrorMessage") String rootErrorMessage
+  )
+  {
+    super(
+        CODE,
+        "Worker[%d] exceeded max relaunch count of %d for task[%s]. Latest 
failure reason: %s.",
+        workerNumber,
+        maxPerWorkerRelaunchCount,
+        taskId,
+        rootErrorMessage
+    );
+    this.maxPerWorkerRelaunchCount = maxPerWorkerRelaunchCount;
+    this.taskId = taskId;
+    this.workerNumber = workerNumber;
+    this.rootErrorMessage = rootErrorMessage;
+  }
+
+  @JsonProperty
+  public int getMaxPerWorkerRelaunchCount()
+  {
+    return maxPerWorkerRelaunchCount;
+  }
+
+  @JsonProperty
+  public int getWorkerNumber()
+  {
+    return workerNumber;
+  }
+
+  @JsonProperty
+  public String getTaskId()
+  {
+    return taskId;
+  }
+
+  @JsonProperty
+  public String getRootErrorMessage()
+  {
+    return rootErrorMessage;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    WorkerRelaunchedTooManyTimes that = (WorkerRelaunchedTooManyTimes) o;
+    return maxPerWorkerRelaunchCount == that.maxPerWorkerRelaunchCount

Review Comment:
   nit: Indentation seems a bit off



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -546,4 +614,65 @@ private static Map<StageId, Set<StageId>> 
computeStageOutflowMap(final QueryDefi
 
     return retVal;
   }
+
+  /**
+   * Checks the {@link MSQFault#getErrorCode()} is eligible for retry.
+   * <br/>
+   * If yes, transitions the stage to{@link ControllerStagePhase#RETRYING} and 
returns all the {@link WorkOrder}
+   * <br/>
+   * else throw {@link MSQException}
+   *
+   * @param workerNumber
+   * @param msqFault
+   * @return List of {@link WorkOrder} that needs to be retried.
+   */
+  public List<WorkOrder> getWorkInCaseWorkerElgibileForRetryElseThrow(int 
workerNumber, MSQFault msqFault)
+  {
+
+    final String errorCode;
+    if (msqFault instanceof WorkerFailedFault) {
+      errorCode = MSQFaultUtils.getErrorCodeFromMessage((((WorkerFailedFault) 
msqFault).getErrorMsg()));
+    } else {
+      errorCode = msqFault.getErrorCode();
+    }
+
+    if (retriableErrorCodes.contains(errorCode)) {
+      return getWorkInCaseWorkerElgibileForRetryElseThrow(workerNumber);
+
+    } else {
+      throw new MSQException(msqFault);
+    }
+  }
+
+  /**
+   * Gets all the stages currently being tracked and filters out all 
effectively finished stages.
+   * <br/>
+   * From the remaining stages, checks if (stage,worker) needs to be retried.
+   * <br/>
+   * If yes adds the workOrder for that stage to the return list and 
transitions the stage kernel to {@link ControllerStagePhase#RETRYING}
+   *
+   * @param worker
+   * @return List of {@link WorkOrder} that needs to be retried.
+   */
+  private List<WorkOrder> getWorkInCaseWorkerElgibileForRetryElseThrow(int 
worker)
+  {
+    List<StageId> trackedSet = new ArrayList<>(getActiveStages());
+    // no need to retry effectively finished stages
+    List<StageId> getEffictivelyFinishedStages = 
getEffectivelyFinishedStageIds();

Review Comment:
   nit: spelling



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStagePhase.java:
##########
@@ -35,7 +35,7 @@
     @Override
     public boolean canTransitionFrom(final ControllerStagePhase priorPhase)
     {
-      return false;
+      return true;

Review Comment:
   `NEW` should be the initial state and therefore initialized when we are 
creating the `ControllerStagePhase`. We shouldn't allow any transition to `NEW` 
I think.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -546,4 +614,65 @@ private static Map<StageId, Set<StageId>> 
computeStageOutflowMap(final QueryDefi
 
     return retVal;
   }
+
+  /**
+   * Checks the {@link MSQFault#getErrorCode()} is eligible for retry.
+   * <br/>
+   * If yes, transitions the stage to{@link ControllerStagePhase#RETRYING} and 
returns all the {@link WorkOrder}
+   * <br/>
+   * else throw {@link MSQException}
+   *
+   * @param workerNumber
+   * @param msqFault
+   * @return List of {@link WorkOrder} that needs to be retried.
+   */
+  public List<WorkOrder> getWorkInCaseWorkerElgibileForRetryElseThrow(int 
workerNumber, MSQFault msqFault)
+  {
+
+    final String errorCode;
+    if (msqFault instanceof WorkerFailedFault) {
+      errorCode = MSQFaultUtils.getErrorCodeFromMessage((((WorkerFailedFault) 
msqFault).getErrorMsg()));
+    } else {
+      errorCode = msqFault.getErrorCode();
+    }
+
+    if (retriableErrorCodes.contains(errorCode)) {
+      return getWorkInCaseWorkerElgibileForRetryElseThrow(workerNumber);
+
+    } else {
+      throw new MSQException(msqFault);
+    }
+  }
+
+  /**
+   * Gets all the stages currently being tracked and filters out all 
effectively finished stages.
+   * <br/>
+   * From the remaining stages, checks if (stage,worker) needs to be retried.
+   * <br/>
+   * If yes adds the workOrder for that stage to the return list and 
transitions the stage kernel to {@link ControllerStagePhase#RETRYING}
+   *
+   * @param worker
+   * @return List of {@link WorkOrder} that needs to be retried.
+   */
+  private List<WorkOrder> getWorkInCaseWorkerElgibileForRetryElseThrow(int 
worker)

Review Comment:
   This method is not thrown anywhere. We should rename it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to