This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new fb38085ddb Add wait for worker shutdown to MSQ task cancel (#14198)
fb38085ddb is described below
commit fb38085ddbb439974325b230739e129ebcbf9ba1
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Sat May 6 04:59:59 2023 +0530
Add wait for worker shutdown to MSQ task cancel (#14198)
* Add wait for worker shutdown to MSQ task cancel
* Fix checkstyle
---
.../java/org/apache/druid/msq/exec/ControllerImpl.java | 2 ++
.../apache/druid/msq/indexing/MSQWorkerTaskLauncher.java | 16 ++++++++++------
2 files changed, 12 insertions(+), 6 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 90b4fc7191..850eb9d2e4 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -355,6 +355,8 @@ public class ControllerImpl implements Controller
throw new MSQException(CanceledFault.INSTANCE);
}
);
+
+ workerTaskLauncher.waitForWorkerShutdown();
}
public TaskStatus runTask(final Closer closer)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index 7295e62e91..6866ddb62e 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -205,12 +205,7 @@ public class MSQWorkerTaskLauncher
}
// Block until stopped.
- try {
- FutureUtils.getUnchecked(stopFuture, false);
- }
- catch (Throwable ignored) {
- // Suppress.
- }
+ waitForWorkerShutdown();
}
/**
@@ -309,6 +304,15 @@ public class MSQWorkerTaskLauncher
}
}
+ public void waitForWorkerShutdown()
+ {
+ try {
+ FutureUtils.getUnchecked(stopFuture, false);
+ }
+ catch (Throwable ignored) {
+ // Suppress.
+ }
+ }
/**
* Checks if the controller has canceled the input taskId. This method is
used in {@link ControllerImpl}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]