This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 559d694ae5c Don't kill tasks while a supervisor is stopping. (#18767)
559d694ae5c is described below

commit 559d694ae5cf2b2328ccff6e45138bf02a0d6192
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Nov 21 07:36:27 2025 -0800

    Don't kill tasks while a supervisor is stopping. (#18767)
    
    * Don't kill tasks while a supervisor is stopping.
    
    Previously, if a supervisor was stopped while discovering tasks or
    updating their status, it could end up trying to kill those tasks because
    the callbacks on the requests to those tasks could fail as a result of
    the stopping. There should be no reason for an actively-stopping supervisor
    to kill a task, so this patch causes the shutdown functions to be no-ops
    while a supervisor is stopping.
    
    This patch also ensures that when a supervisor transitions into the STOPPING
    state, that state takes priority over any other state.
---
 .../supervisor/SeekableStreamSupervisor.java       | 34 +++++++++++++++++++---
 .../supervisor/SupervisorStateManager.java         |  4 ++-
 2 files changed, 33 insertions(+), 5 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index ef65d6d22ca..0f2839e7fba 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -1744,7 +1744,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       checkIfStreamInactiveAndTurnSupervisorIdle();
 
       // If supervisor is already stopping, don't contend for stateChangeLock 
since the block can be skipped
-      if 
(stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING))
 {
+      if (isStopping()) {
         logDebugReport();
         return;
       }
@@ -1752,7 +1752,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       synchronized (stateChangeLock) {
         // if supervisor is not suspended, ensure required tasks are running
         // if suspended, ensure tasks have been requested to gracefully stop
-        if 
(stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING))
 {
+        if (isStopping()) {
           // if we're already terminating, don't do anything here, the 
terminate already handles shutdown
           log.debug("Supervisor[%s] for datasource[%s] is already stopping.", 
supervisorId, dataSource);
         } else if (stateManager.isIdle()) {
@@ -2024,7 +2024,16 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   {
     Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
     if (taskQueue.isPresent()) {
-      taskQueue.get().shutdown(id, reasonFormat, args);
+      if (isStopping()) {
+        log.debug(
+            "Not shutting down task[%s] because the supervisor[%s] has been 
stopped. Reason was[%s]",
+            id,
+            supervisorId,
+            StringUtils.format(reasonFormat, args)
+        );
+      } else {
+        taskQueue.get().shutdown(id, reasonFormat, args);
+      }
     } else {
       log.error("Failed to get task queue because I'm not the leader!");
     }
@@ -2034,7 +2043,16 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   {
     Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
     if (taskQueue.isPresent()) {
-      taskQueue.get().shutdownWithSuccess(id, reasonFormat, args);
+      if (isStopping()) {
+        log.debug(
+            "Not shutting down task[%s] because the supervisor[%s] has been 
stopped. Reason was[%s]",
+            id,
+            supervisorId,
+            StringUtils.format(reasonFormat, args)
+        );
+      } else {
+        taskQueue.get().shutdownWithSuccess(id, reasonFormat, args);
+      }
     } else {
       log.error("Failed to get task queue because I'm not the leader!");
     }
@@ -4562,6 +4580,14 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
+  /**
+   * Whether this supervisor is in a {@link 
SupervisorStateManager.BasicState#STOPPING} state.
+   */
+  private boolean isStopping()
+  {
+    return 
stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING);
+  }
+
   /**
    * Call {@link FutureUtils#coalesce} on the provided list, and wait for the 
result.
    */
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
index f30517d1b22..f64f390fc56 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
@@ -127,7 +127,9 @@ public class SupervisorStateManager
    */
   public synchronized void maybeSetState(State proposedState)
   {
-    if (BasicState.STOPPING.equals(this.supervisorState)) {
+    if (BasicState.STOPPING.equals(this.supervisorState) || 
BasicState.STOPPING.equals(proposedState)) {
+      // STOPPING takes precedence over all other states
+      supervisorState = BasicState.STOPPING;
       return;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to