georgew5656 commented on code in PR #15174:
URL: https://github.com/apache/druid/pull/15174#discussion_r1374833421


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2113,7 +2113,13 @@ public Boolean 
apply(Pair<SeekableStreamIndexTaskRunner.Status, Map<PartitionIdT
     for (int i = 0; i < results.size(); i++) {
       String taskId = futureTaskIds.get(i);
       if (results.get(i).isError() || results.get(i).valueOrThrow() == null) {
-        killTask(taskId, "Task [%s] failed to return status, killing task", 
taskId);
+        log.info("Task [%s] failed to respond, might need to kill it.", 
taskId);
+        Optional<TaskStatus> killTaskStatus = taskStorage.getStatus(taskId);
+        if (killTaskStatus.isPresent() && !killTaskStatus.get().isComplete()) {
+          // If the task completed while we were failing to chat with it, 
don't do anything.
+          log.info("Task [%s] failed to return status, killing task", taskId);
+          killTask(taskId, "Task [%s] failed to return status, killing task", 
taskId);

Review Comment:
   it seems like taskQueue should handle this situation since if the task has 
been removed from the taskQueue.tasks map, it does nothing on shutdown
   `
       giant.lock();
   
       try {
         final Task task = tasks.get(Preconditions.checkNotNull(taskId, 
"taskId"));
         if (task != null) {
           notifyStatus(task, TaskStatus.failure(taskId, 
StringUtils.format(reasonFormat, args)), reasonFormat, args);
         }
       }
       finally {
         giant.unlock();
       }
   `
   
   there is a potential problem in the notifyStatus logic that is responsible 
for removing the taskId from the taskQueue.tasks map.
   `
       try {
         final Optional<TaskStatus> previousStatus = 
taskStorage.getStatus(task.getId());
         if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) 
{
           log.makeAlert("Ignoring notification for already-complete 
task").addData("task", task.getId()).emit();
         } else {
           taskStorage.setStatus(taskStatus.withLocation(taskLocation)); <- 
this is where the task is marked as successful.
         }
       }
       catch (Throwable e) {
         // If persist fails, even after the retries performed in taskStorage, 
then metadata store and actual cluster
         // state have diverged. Send out an alert and continue with the task 
shutdown routine.
         log.makeAlert(e, "Failed to persist status for task")
            .addData("task", task.getId())
            .addData("statusCode", taskStatus.getStatusCode())
            .emit();
       }
   
       // Inform taskRunner that this task can be shut down.
       try {
         taskRunner.shutdown(task.getId(), reasonFormat, args);
       }
       catch (Throwable e) {
         // If task runner shutdown fails, continue with the task shutdown 
routine. We'll come back and try to
         // shut it down again later in manageInternalPostCritical, once it's 
removed from the "tasks" map.
         log.warn(e, "TaskRunner failed to cleanup task after completion: %s", 
task.getId());
       }
   
       // Critical section: remove this task from all of our tracking data 
structures.
       giant.lock();
       try {
         if (removeTaskInternal(task.getId())) { <- this is where the task is 
removed from taskQueue.tasks
           taskFutures.remove(task.getId());
         } else {
           log.warn("Unknown task[%s] completed", task.getId());
         }
   
         recentlyCompletedTasks.remove(task.getId());
         requestManagement();
       }
       finally {
         giant.unlock();
       }
   `
   
   outside of locking the whole method (which seems like it might cause lock 
contention issues), I can't think of a good way to address this potential race



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to