This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 037a9bdcb2 [To rel/1.1][IOTDB-5786] Fix potential deadlock in
DriverScheduler
037a9bdcb2 is described below
commit 037a9bdcb20b88a9e5e2ff7ff9311a4471eb8795
Author: Liao Lanyu <[email protected]>
AuthorDate: Sat Apr 22 21:31:51 2023 +0800
[To rel/1.1][IOTDB-5786] Fix potential deadlock in DriverScheduler
---
.../db/mpp/execution/schedule/DriverScheduler.java | 116 ++++++++++-----------
1 file changed, 56 insertions(+), 60 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 50862859e6..c581f67709 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -254,13 +254,8 @@ public class DriverScheduler implements IDriverScheduler,
IService {
for (Set<DriverTask> fragmentRelatedTasks : queryRelatedTasks.values()) {
if (fragmentRelatedTasks != null) {
for (DriverTask task : fragmentRelatedTasks) {
- task.lock();
- try {
-
task.setAbortCause(DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED);
- clearDriverTask(task);
- } finally {
- task.unlock();
- }
+
task.setAbortCause(DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED);
+ clearDriverTask(task);
}
}
}
@@ -279,13 +274,8 @@ public class DriverScheduler implements IDriverScheduler,
IService {
if (task == null) {
return;
}
- task.lock();
- try {
-
task.setAbortCause(DriverTaskAbortedException.BY_FRAGMENT_ABORT_CALLED);
- clearDriverTask(task);
- } finally {
- task.unlock();
- }
+
task.setAbortCause(DriverTaskAbortedException.BY_FRAGMENT_ABORT_CALLED);
+ clearDriverTask(task);
}
}
}
@@ -295,26 +285,31 @@ public class DriverScheduler implements IDriverScheduler,
IService {
private void clearDriverTask(DriverTask task) {
try (SetThreadName driverTaskName =
new SetThreadName(task.getDriver().getDriverTaskId().getFullId())) {
- DriverTaskStatus status = task.getStatus();
- switch (status) {
- // If it has been aborted, return directly
- case ABORTED:
- return;
- case READY:
- task.setStatus(DriverTaskStatus.ABORTED);
- readyQueue.remove(task.getDriverTaskId());
- break;
- case BLOCKED:
- task.setStatus(DriverTaskStatus.ABORTED);
- blockedTasks.remove(task);
- readyQueue.decreaseReservedSize();
- break;
- case RUNNING:
- task.setStatus(DriverTaskStatus.ABORTED);
- readyQueue.decreaseReservedSize();
- break;
- case FINISHED:
- break;
+ try {
+ task.lock();
+ DriverTaskStatus status = task.getStatus();
+ switch (status) {
+ // If it has been aborted, return directly
+ case ABORTED:
+ return;
+ case READY:
+ task.setStatus(DriverTaskStatus.ABORTED);
+ readyQueue.remove(task.getDriverTaskId());
+ break;
+ case BLOCKED:
+ task.setStatus(DriverTaskStatus.ABORTED);
+ blockedTasks.remove(task);
+ readyQueue.decreaseReservedSize();
+ break;
+ case RUNNING:
+ task.setStatus(DriverTaskStatus.ABORTED);
+ readyQueue.decreaseReservedSize();
+ break;
+ case FINISHED:
+ break;
+ }
+ } finally {
+ task.unlock();
}
timeoutQueue.remove(task.getDriverTaskId());
@@ -333,26 +328,31 @@ public class DriverScheduler implements IDriverScheduler,
IService {
queryMap.remove(task.getDriverTaskId().getQueryId());
}
}
- if (task.getAbortCause() != null) {
- try {
- task.getDriver()
- .failed(
- new DriverTaskAbortedException(
- task.getDriver().getDriverTaskId().getFullId(),
task.getAbortCause()));
- } catch (Exception e) {
- logger.error("Clear DriverTask failed", e);
+ try {
+ task.lock();
+ if (task.getAbortCause() != null) {
+ try {
+ task.getDriver()
+ .failed(
+ new DriverTaskAbortedException(
+ task.getDriver().getDriverTaskId().getFullId(),
task.getAbortCause()));
+ } catch (Exception e) {
+ logger.error("Clear DriverTask failed", e);
+ }
}
- }
- if (task.getStatus() == DriverTaskStatus.ABORTED) {
- try {
- blockManager.forceDeregisterFragmentInstance(
- new TFragmentInstanceId(
- task.getDriverTaskId().getQueryId().getId(),
- task.getDriverTaskId().getFragmentId().getId(),
-
task.getDriverTaskId().getFragmentInstanceId().getInstanceId()));
- } catch (Exception e) {
- logger.error("Clear DriverTask failed", e);
+ if (task.getStatus() == DriverTaskStatus.ABORTED) {
+ try {
+ blockManager.forceDeregisterFragmentInstance(
+ new TFragmentInstanceId(
+ task.getDriverTaskId().getQueryId().getId(),
+ task.getDriverTaskId().getFragmentId().getId(),
+
task.getDriverTaskId().getFragmentInstanceId().getInstanceId()));
+ } catch (Exception e) {
+ logger.error("Clear DriverTask failed", e);
+ }
}
+ } finally {
+ task.unlock();
}
}
}
@@ -488,6 +488,7 @@ public class DriverScheduler implements IDriverScheduler,
IService {
} finally {
task.unlock();
}
+ // wrap this clearDriverTask to avoid that status is changed to Aborted
during clearDriverTask
task.lock();
try {
clearDriverTask(task);
@@ -510,10 +511,10 @@ public class DriverScheduler implements IDriverScheduler,
IService {
logger.warn(
"The task {} is aborted. All other tasks in the same query will
be cancelled",
task.getDriverTaskId());
- clearDriverTask(task);
} finally {
task.unlock();
}
+ clearDriverTask(task);
QueryId queryId = task.getDriverTaskId().getQueryId();
Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks =
queryMap.remove(queryId);
if (queryRelatedTasks != null) {
@@ -524,13 +525,8 @@ public class DriverScheduler implements IDriverScheduler,
IService {
if (task.equals(otherTask)) {
continue;
}
- otherTask.lock();
- try {
-
otherTask.setAbortCause(DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED);
- clearDriverTask(otherTask);
- } finally {
- otherTask.unlock();
- }
+
otherTask.setAbortCause(DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED);
+ clearDriverTask(otherTask);
}
}
}