This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 559d694ae5c Don't kill tasks while a supervisor is stopping. (#18767)
559d694ae5c is described below
commit 559d694ae5cf2b2328ccff6e45138bf02a0d6192
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Nov 21 07:36:27 2025 -0800
Don't kill tasks while a supervisor is stopping. (#18767)
* Don't kill tasks while a supervisor is stopping.
Previously, if a supervisor was stopped while discovering tasks or
updating their status, it could end up trying to kill those tasks because
the callbacks on the requests to those tasks could fail as a result of
the stopping. There should be no reason for an actively-stopping supervisor
to kill a task, so this patch causes the shutdown functions to be no-ops
while a supervisor is stopping.
This patch also ensures that when a supervisor transitions into the STOPPING
state, that state takes priority over any other state.
---
.../supervisor/SeekableStreamSupervisor.java | 34 +++++++++++++++++++---
.../supervisor/SupervisorStateManager.java | 4 ++-
2 files changed, 33 insertions(+), 5 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index ef65d6d22ca..0f2839e7fba 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -1744,7 +1744,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
checkIfStreamInactiveAndTurnSupervisorIdle();
// If supervisor is already stopping, don't contend for stateChangeLock
since the block can be skipped
- if
(stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING))
{
+ if (isStopping()) {
logDebugReport();
return;
}
@@ -1752,7 +1752,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
synchronized (stateChangeLock) {
// if supervisor is not suspended, ensure required tasks are running
// if suspended, ensure tasks have been requested to gracefully stop
- if
(stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING))
{
+ if (isStopping()) {
// if we're already terminating, don't do anything here, the
terminate already handles shutdown
log.debug("Supervisor[%s] for datasource[%s] is already stopping.",
supervisorId, dataSource);
} else if (stateManager.isIdle()) {
@@ -2024,7 +2024,16 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
{
Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
if (taskQueue.isPresent()) {
- taskQueue.get().shutdown(id, reasonFormat, args);
+ if (isStopping()) {
+ log.debug(
+ "Not shutting down task[%s] because the supervisor[%s] has been
stopped. Reason was[%s]",
+ id,
+ supervisorId,
+ StringUtils.format(reasonFormat, args)
+ );
+ } else {
+ taskQueue.get().shutdown(id, reasonFormat, args);
+ }
} else {
log.error("Failed to get task queue because I'm not the leader!");
}
@@ -2034,7 +2043,16 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
{
Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
if (taskQueue.isPresent()) {
- taskQueue.get().shutdownWithSuccess(id, reasonFormat, args);
+ if (isStopping()) {
+ log.debug(
+ "Not shutting down task[%s] because the supervisor[%s] has been
stopped. Reason was[%s]",
+ id,
+ supervisorId,
+ StringUtils.format(reasonFormat, args)
+ );
+ } else {
+ taskQueue.get().shutdownWithSuccess(id, reasonFormat, args);
+ }
} else {
log.error("Failed to get task queue because I'm not the leader!");
}
@@ -4562,6 +4580,14 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
+ /**
+ * Whether this supervisor is in a {@link
SupervisorStateManager.BasicState#STOPPING} state.
+ */
+ private boolean isStopping()
+ {
+ return
stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING);
+ }
+
/**
* Call {@link FutureUtils#coalesce} on the provided list, and wait for the
result.
*/
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
index f30517d1b22..f64f390fc56 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
@@ -127,7 +127,9 @@ public class SupervisorStateManager
*/
public synchronized void maybeSetState(State proposedState)
{
- if (BasicState.STOPPING.equals(this.supervisorState)) {
+ if (BasicState.STOPPING.equals(this.supervisorState) ||
BasicState.STOPPING.equals(proposedState)) {
+ // STOPPING takes precedence over all other states
+ supervisorState = BasicState.STOPPING;
return;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]