LakshSingla commented on code in PR #13353:
URL: https://github.com/apache/druid/pull/13353#discussion_r1024799188


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java:
##########
@@ -107,6 +117,19 @@
    */
   private final Set<StageId> effectivelyFinishedStages = new HashSet<>();
 
+
+  /**
+   * Store the work orders for the stage so that we can retrieve that in case 
of worker retry
+   */
+  private final Map<StageId, Int2ObjectMap<WorkOrder>> stageWorkOrders;
+
+  /**
+   * {@link MSQFault#getErrorCode()} which are retried.
+   */
+  private final Set<String> retriableErrorCodes = 
ImmutableSet.of(CanceledFault.CODE, UnknownFault.CODE,

Review Comment:
   I think we should define it in a separate constants file (a new one might 
do), in case you feel this will be required in other places as well.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java:
##########
@@ -398,33 +458,125 @@ private void updateTaskTrackersAndTaskIds()
   /**
    * Used by the main loop to generate exceptions if any tasks have failed, 
have taken too long to start up, or
    * have gone inexplicably missing.
-   *
+   * <p>
    * Throws an exception if some task is erroneous.
    */
   private void checkForErroneousTasks()
   {
     final int numTasks = taskTrackers.size();
 
-    for (final Map.Entry<String, TaskTracker> taskEntry : 
taskTrackers.entrySet()) {
+    Iterator<Map.Entry<String, TaskTracker>> taskTrackerIterator = 
taskTrackers.entrySet().iterator();
+
+    while (taskTrackerIterator.hasNext()) {
+      final Map.Entry<String, TaskTracker> taskEntry = 
taskTrackerIterator.next();
       final String taskId = taskEntry.getKey();
       final TaskTracker tracker = taskEntry.getValue();
-
-      if (tracker.status == null) {
-        throw new 
MSQException(UnknownFault.forMessage(StringUtils.format("Task [%s] status 
missing", taskId)));
+      if (tracker.isRetrying()) {
+        continue;
       }
 
-      if (tracker.didRunTimeOut(maxTaskStartDelayMillis) && 
!canceledWorkerTasks.contains(taskId)) {
+      if (tracker.status == null) {
+        removeWorkerFromFullyStartedWorkers(tracker);
+        final String errorMessage = StringUtils.format("Task [%s] status 
missing", taskId);
+        log.info(errorMessage + ". Trying to relaunch the worker");
+        tracker.enableRetrying();
+        retryTask.retry(
+            tracker.msqWorkerTask,
+            UnknownFault.forMessage(errorMessage)
+        );
+
+      } else if (tracker.didRunTimeOut(maxTaskStartDelayMillis) && 
!canceledWorkerTasks.contains(taskId)) {
+        removeWorkerFromFullyStartedWorkers(tracker);
         throw new MSQException(new TaskStartTimeoutFault(numTasks + 1));
+      } else if (tracker.didFail() && !canceledWorkerTasks.contains(taskId)) {
+        removeWorkerFromFullyStartedWorkers(tracker);
+        log.info("Task[%s] failed because %s. Trying to relaunch the worker", 
taskId, tracker.status.getErrorMsg());
+        tracker.enableRetrying();
+        retryTask.retry(tracker.msqWorkerTask, new WorkerFailedFault(taskId, 
tracker.status.getErrorMsg()));
       }
+    }
+  }
 
-      if (tracker.didFail() && !canceledWorkerTasks.contains(taskId)) {
-        throw new MSQException(new WorkerFailedFault(taskId, 
tracker.status.getErrorMsg()));
-      }
+  private void removeWorkerFromFullyStartedWorkers(TaskTracker tracker)
+  {
+    synchronized (taskIds) {
+      fullyStartedTasks.remove(tracker.msqWorkerTask.getWorkerNumber());
+    }
+  }
+
+
+  private void relaunchTasks()
+  {
+    Iterator<Integer> iterator = workersToRelaunch.iterator();
+
+    while (iterator.hasNext()) {
+      int worker = iterator.next();
+      workerToTaskIds.compute(worker, (workerId, taskHistory) -> {
+
+        if (taskHistory == null || taskHistory.isEmpty()) {
+          throw new ISE("TaskHistory cannot by null for worker %d", workerId);
+        }
+        String latestTaskId = taskHistory.get(taskHistory.size() - 1);
+
+        TaskTracker tracker = taskTrackers.get(latestTaskId);
+        if (tracker == null) {
+          throw new ISE("Did not find taskTracker for latest taskId[%s]", 
latestTaskId);
+        }
+        // if task is not failed donot retry
+        if (!tracker.isComplete()) {
+          return taskHistory;
+        }
+
+        MSQWorkerTask toRelaunch = tracker.msqWorkerTask;
+        MSQWorkerTask relaunchedTask = toRelaunch.getRetryTask();
+
+        // check relaunch limits
+        checkRelaunchLimits(tracker, toRelaunch);
+        // clean up trackers and tasks
+        tasksToCleanup.add(latestTaskId);
+        taskTrackers.remove(latestTaskId);
+        currentRelaunchCount += 1;
+        taskTrackers.put(relaunchedTask.getId(), new 
TaskTracker(relaunchedTask.getWorkerNumber(), relaunchedTask));
+        context.workerManager().run(relaunchedTask.getId(), relaunchedTask);
+        taskHistory.add(relaunchedTask.getId());
+
+        synchronized (taskIds) {
+          // replace taskId with the retry taskID for the same worker number
+          taskIds.set(toRelaunch.getWorkerNumber(), relaunchedTask.getId());
+          fullyStartedTasks.remove(relaunchedTask.getWorkerNumber());
+          taskIds.notifyAll();
+        }
+        return taskHistory;
+
+      });
+      iterator.remove();
+    }
+  }
+
+  private void checkRelaunchLimits(TaskTracker tracker, MSQWorkerTask 
relaunchTask)

Review Comment:
   The method name (or signature) should mention that this can throw.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to