This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new c3aaf81 fix TaskQueue-HRTR deadlock (#6212)
c3aaf81 is described below
commit c3aaf8122d658fce578e75ccbba18f7c74b8114f
Author: Himanshu <[email protected]>
AuthorDate: Sat Aug 25 14:15:57 2018 -0700
fix TaskQueue-HRTR deadlock (#6212)
* fix TaskQueue-HRTR deadlock causing
https://github.com/apache/incubator-druid/issues/6201
* address review comments
---
.../overlord/hrtr/HttpRemoteTaskRunner.java | 35 +++++++++++++++-------
1 file changed, 25 insertions(+), 10 deletions(-)
diff --git
a/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
b/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 36fc44a..42f8d5c 100644
---
a/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++
b/indexing-service/src/main/java/io/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -376,6 +376,7 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
// on a worker - this avoids overflowing a worker with tasks
long waitMs =
config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
long waitStart = System.currentTimeMillis();
+ boolean isTaskAssignmentTimedOut = false;
synchronized (statusLock) {
while (tasks.containsKey(taskId)
&& tasks.get(taskId).getState() ==
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
@@ -383,29 +384,38 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
if (remaining > 0) {
statusLock.wait(remaining);
} else {
- log.makeAlert(
- "Task assignment timed out on worker [%s], never ran task [%s]
in timeout[%s]!",
- workerHost,
- taskId,
- config.getTaskAssignmentTimeout()
- ).emit();
- taskComplete(workItem, workerHolder, TaskStatus.failure(taskId));
- return true;
+ isTaskAssignmentTimedOut = true;
+ break;
}
}
- return true;
}
+
+ if (isTaskAssignmentTimedOut) {
+ log.makeAlert(
+ "Task assignment timed out on worker [%s], never ran task [%s] in
timeout[%s]!",
+ workerHost,
+ taskId,
+ config.getTaskAssignmentTimeout()
+ ).emit();
+ taskComplete(workItem, workerHolder, TaskStatus.failure(taskId));
+ }
+
+ return true;
} else {
return false;
}
}
+ // CAUTION: This method calls RemoteTaskRunnerWorkItem.setResult(..) which
results in TaskQueue.notifyStatus() being called
+ // because that is attached by TaskQueue to task result future. So, this
method must not be called with "statusLock"
+ // held. See https://github.com/apache/incubator-druid/issues/6201
private void taskComplete(
HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
WorkerHolder workerHolder,
TaskStatus taskStatus
)
{
+ Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread
must not hold statusLock.");
Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
Preconditions.checkNotNull(taskStatus, "taskStatus");
if (workerHolder != null) {
@@ -1170,6 +1180,7 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
HttpRemoteTaskRunnerWorkItem taskItem;
boolean shouldShutdownTask = false;
+ boolean isTaskCompleted = false;
synchronized (statusLock) {
taskItem = tasks.get(taskId);
@@ -1293,7 +1304,7 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
TaskRunnerUtils.notifyLocationChanged(listeners, taskId,
announcement.getTaskLocation());
}
- taskComplete(taskItem, workerHolder,
announcement.getTaskStatus());
+ isTaskCompleted = true;
} else {
log.warn(
"Worker[%s] reported completed task[%s] which is being
run by another worker[%s]. Notification ignored.",
@@ -1327,6 +1338,10 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
}
}
+ if (isTaskCompleted) {
+ taskComplete(taskItem, workerHolder, announcement.getTaskStatus());
+ }
+
if (shouldShutdownTask) {
log.warn("Killing task[%s] on worker[%s].", taskId, worker.getHost());
workerHolder.shutdownTask(taskId);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]