This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c3aaf81  fix TaskQueue-HRTR deadlock (#6212)
c3aaf81 is described below

commit c3aaf8122d658fce578e75ccbba18f7c74b8114f
Author: Himanshu <[email protected]>
AuthorDate: Sat Aug 25 14:15:57 2018 -0700

    fix TaskQueue-HRTR deadlock (#6212)
    
    * fix TaskQueue-HRTR deadlock causing 
https://github.com/apache/incubator-druid/issues/6201
    
    * address review comments
---
 .../overlord/hrtr/HttpRemoteTaskRunner.java        | 35 +++++++++++++++-------
 1 file changed, 25 insertions(+), 10 deletions(-)

diff --git 
a/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 
b/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 36fc44a..42f8d5c 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -376,6 +376,7 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
       // on a worker - this avoids overflowing a worker with tasks
       long waitMs = 
config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
       long waitStart = System.currentTimeMillis();
+      boolean isTaskAssignmentTimedOut = false;
       synchronized (statusLock) {
         while (tasks.containsKey(taskId)
                && tasks.get(taskId).getState() == 
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
@@ -383,29 +384,38 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
           if (remaining > 0) {
             statusLock.wait(remaining);
           } else {
-            log.makeAlert(
-                "Task assignment timed out on worker [%s], never ran task [%s] 
in timeout[%s]!",
-                workerHost,
-                taskId,
-                config.getTaskAssignmentTimeout()
-            ).emit();
-            taskComplete(workItem, workerHolder, TaskStatus.failure(taskId));
-            return true;
+            isTaskAssignmentTimedOut = true;
+            break;
           }
         }
-        return true;
       }
+
+      if (isTaskAssignmentTimedOut) {
+        log.makeAlert(
+            "Task assignment timed out on worker [%s], never ran task [%s] in 
timeout[%s]!",
+            workerHost,
+            taskId,
+            config.getTaskAssignmentTimeout()
+        ).emit();
+        taskComplete(workItem, workerHolder, TaskStatus.failure(taskId));
+      }
+
+      return true;
     } else {
       return false;
     }
   }
 
+  // CAUTION: This method calls RemoteTaskRunnerWorkItem.setResult(..) which 
results in TaskQueue.notifyStatus() being called
+  // because that is attached by TaskQueue to task result future. So, this 
method must not be called with "statusLock"
+  // held. See https://github.com/apache/incubator-druid/issues/6201
   private void taskComplete(
       HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
       WorkerHolder workerHolder,
       TaskStatus taskStatus
   )
   {
+    Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread 
must not hold statusLock.");
     Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
     Preconditions.checkNotNull(taskStatus, "taskStatus");
     if (workerHolder != null) {
@@ -1170,6 +1180,7 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
 
     HttpRemoteTaskRunnerWorkItem taskItem;
     boolean shouldShutdownTask = false;
+    boolean isTaskCompleted = false;
 
     synchronized (statusLock) {
       taskItem = tasks.get(taskId);
@@ -1293,7 +1304,7 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
                     TaskRunnerUtils.notifyLocationChanged(listeners, taskId, 
announcement.getTaskLocation());
                   }
 
-                  taskComplete(taskItem, workerHolder, 
announcement.getTaskStatus());
+                  isTaskCompleted = true;
                 } else {
                   log.warn(
                       "Worker[%s] reported completed task[%s] which is being 
run by another worker[%s]. Notification ignored.",
@@ -1327,6 +1338,10 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
       }
     }
 
+    if (isTaskCompleted) {
+      taskComplete(taskItem, workerHolder, announcement.getTaskStatus());
+    }
+
     if (shouldShutdownTask) {
       log.warn("Killing task[%s] on worker[%s].", taskId, worker.getHost());
       workerHolder.shutdownTask(taskId);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to