gianm closed pull request #6212: fix TaskQueue-HRTR deadlock
URL: https://github.com/apache/incubator-druid/pull/6212
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 
b/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 36fc44a80c3..42f8d5c22e3 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -376,6 +376,7 @@ private boolean runTaskOnWorker(
       // on a worker - this avoids overflowing a worker with tasks
       long waitMs = 
config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
       long waitStart = System.currentTimeMillis();
+      boolean isTaskAssignmentTimedOut = false;
       synchronized (statusLock) {
         while (tasks.containsKey(taskId)
                && tasks.get(taskId).getState() == 
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
@@ -383,29 +384,38 @@ private boolean runTaskOnWorker(
           if (remaining > 0) {
             statusLock.wait(remaining);
           } else {
-            log.makeAlert(
-                "Task assignment timed out on worker [%s], never ran task [%s] 
in timeout[%s]!",
-                workerHost,
-                taskId,
-                config.getTaskAssignmentTimeout()
-            ).emit();
-            taskComplete(workItem, workerHolder, TaskStatus.failure(taskId));
-            return true;
+            isTaskAssignmentTimedOut = true;
+            break;
           }
         }
-        return true;
       }
+
+      if (isTaskAssignmentTimedOut) {
+        log.makeAlert(
+            "Task assignment timed out on worker [%s], never ran task [%s] in 
timeout[%s]!",
+            workerHost,
+            taskId,
+            config.getTaskAssignmentTimeout()
+        ).emit();
+        taskComplete(workItem, workerHolder, TaskStatus.failure(taskId));
+      }
+
+      return true;
     } else {
       return false;
     }
   }
 
+  // CAUTION: This method calls RemoteTaskRunnerWorkItem.setResult(..) which 
results in TaskQueue.notifyStatus() being called
+  // because that is attached by TaskQueue to task result future. So, this 
method must not be called with "statusLock"
+  // held. See https://github.com/apache/incubator-druid/issues/6201
   private void taskComplete(
       HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
       WorkerHolder workerHolder,
       TaskStatus taskStatus
   )
   {
+    Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread 
must not hold statusLock.");
     Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
     Preconditions.checkNotNull(taskStatus, "taskStatus");
     if (workerHolder != null) {
@@ -1170,6 +1180,7 @@ void taskAddedOrUpdated(final TaskAnnouncement 
announcement, final WorkerHolder
 
     HttpRemoteTaskRunnerWorkItem taskItem;
     boolean shouldShutdownTask = false;
+    boolean isTaskCompleted = false;
 
     synchronized (statusLock) {
       taskItem = tasks.get(taskId);
@@ -1293,7 +1304,7 @@ void taskAddedOrUpdated(final TaskAnnouncement 
announcement, final WorkerHolder
                     TaskRunnerUtils.notifyLocationChanged(listeners, taskId, 
announcement.getTaskLocation());
                   }
 
-                  taskComplete(taskItem, workerHolder, 
announcement.getTaskStatus());
+                  isTaskCompleted = true;
                 } else {
                   log.warn(
                       "Worker[%s] reported completed task[%s] which is being 
run by another worker[%s]. Notification ignored.",
@@ -1327,6 +1338,10 @@ void taskAddedOrUpdated(final TaskAnnouncement 
announcement, final WorkerHolder
       }
     }
 
+    if (isTaskCompleted) {
+      taskComplete(taskItem, workerHolder, announcement.getTaskStatus());
+    }
+
     if (shouldShutdownTask) {
       log.warn("Killing task[%s] on worker[%s].", taskId, worker.getHost());
       workerHolder.shutdownTask(taskId);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to