FrankChen021 commented on code in PR #19538:
URL: https://github.com/apache/druid/pull/19538#discussion_r3341106768
##########
multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java:
##########
@@ -250,15 +275,47 @@ public void start()
@LifecycleStop
public void stop()
{
+ final List<WorkerHolder> runningWorkers;
synchronized (this) {
- final Collection<WorkerHolder> holders = workerMap.values();
-
- for (final WorkerHolder holder : holders) {
- holder.runRef.cancel();
+ if (stopped) {
+ return;
}
+ stopped = true;
+ runningWorkers = new ArrayList<>(workerMap.values());
+ workerMap.clear();
+ }
- for (final WorkerHolder holder : holders) {
- holder.runRef.awaitStop();
+ if (runningWorkers.isEmpty()) {
+ return;
+ }
+
+ // Wait for workers to exit outside the lock.
+ final DateTime waitStart = DateTimes.utc(clock.millis());
+ final DateTime deadline =
waitStart.plus(serverConfig.getGracefulShutdownTimeout());
+
+ log.info(
+ "Waiting until[%s] for queries[%s] to stop.",
+ deadline,
+ runningWorkers.stream().map(holder ->
holder.workerContext.queryId()).collect(Collectors.joining(", "))
+ );
+
+ for (final WorkerHolder holder : runningWorkers) {
+ try {
+ final long timeout = deadline.getMillis() - clock.millis();
+ if (timeout <= 0 || !holder.runRef.awaitStop(timeout,
TimeUnit.MILLISECONDS)) {
+ log.warn(
+ "Canceling work for query[%s] due to timeout during stop (waited
[%,d] ms)",
+ holder.workerContext.queryId(),
+ clock.millis() - waitStart.getMillis()
+ );
+
+ holder.runRef.cancel();
+ holder.runRef.awaitStop();
+ }
+ }
+ catch (InterruptedException e) {
Review Comment:
[P2] Cancel remaining workers when shutdown wait is interrupted
If the lifecycle stop thread is interrupted while waiting for a worker,
awaitStop now throws and this catch exits without canceling the current or
remaining runningWorkers. Since stopped is already true and workerMap has been
cleared, a later stop() call returns immediately, leaving background worker
threads running instead of forcing cancellation. The interruption path should
cancel the copied workers before returning or rethrowing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]