This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b847c86a58 Add task context parameter `subTaskTimeoutMillis` for 
index_parallel tasks (#18039)
5b847c86a58 is described below

commit 5b847c86a5820b08cd4510902bc066bb3034e8a4
Author: Misha <[email protected]>
AuthorDate: Wed Jun 4 07:52:36 2025 +0200

    Add task context parameter `subTaskTimeoutMillis` for index_parallel tasks 
(#18039)
    
    Changes
    ---------
    - Add task context parameter `subTaskTimeoutMillis`
    - Update `TaskMonitor` to cancel a sub-task if it exceeds timeout
    - Do not retry a sub-task that has been cancelled due to timeout
    
    Key benefits
    ------------
    - Fail fast: instead of waiting forever, a stuck subtask is cancelled and 
the parent job errors immediately.
    - Free slots: zombies don’t tie up worker slots for hours or days.
    - Automate recovery: failures trigger existing retry/alert pipelines, so 
ops can fix or let automation spin up
    a fresh attempt.
---
 docs/ingestion/tasks.md                            |  1 +
 .../apache/druid/indexing/common/task/Tasks.java   |  2 ++
 .../batch/parallel/ParallelIndexPhaseRunner.java   | 10 +++++-
 .../common/task/batch/parallel/TaskMonitor.java    | 28 ++++++++++++++--
 .../task/batch/parallel/TaskMonitorTest.java       | 38 ++++++++++++++++++++--
 5 files changed, 73 insertions(+), 6 deletions(-)

diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md
index 9396378bcdb..2dc82b7dc49 100644
--- a/docs/ingestion/tasks.md
+++ b/docs/ingestion/tasks.md
@@ -472,6 +472,7 @@ The following parameters apply to all task types.
 |`useLineageBasedSegmentAllocation`|Enables the new lineage-based segment 
allocation protocol for the native Parallel task with dynamic partitioning. 
This option should be off during the replacing rolling upgrade from one of the 
Druid versions between 0.19 and 0.21 to Druid 0.22 or higher. Once the upgrade 
is done, it must be set to `true` to ensure data correctness.|`false` in 0.21 
or earlier, `true` in 0.22 or later|
 |`lookupLoadingMode`|Controls the lookup loading behavior in tasks. This 
property supports three values: `ALL` mode loads all the lookups, `NONE` mode 
does not load any lookups and `ONLY_REQUIRED` mode loads the lookups specified 
with context key `lookupsToLoad`. This property must not be specified for `MSQ` 
and `kill` tasks as the task engine enforces `ONLY_REQUIRED` mode for 
`MSQWorkerTask` and `NONE` mode for `MSQControllerTask` and `kill` tasks.|`ALL`|
 |`lookupsToLoad`|List of lookup names to load in tasks. This property is 
required only if the `lookupLoadingMode` is set to `ONLY_REQUIRED`. For 
`MSQWorkerTask` type, the lookup names to load are identified by the controller 
task by parsing the SQL. |`null`|
+|`subTaskTimeoutMillis`|Maximum time (in milliseconds) to wait before 
cancelling a long-running sub-task. Applicable only for `index_parallel` tasks 
and `compact` tasks (when running in parallel mode). Set to 0 for no timeout 
(infinite).|0 (unlimited)|
 
 ## Task logs
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
index 8e62113f0c7..90fb67116b9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
@@ -44,6 +44,7 @@ public class Tasks
 
   public static final int DEFAULT_TASK_PRIORITY = 0;
   public static final long DEFAULT_LOCK_TIMEOUT_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
+  public static final long DEFAULT_SUB_TASK_TIMEOUT_MILLIS = 0;
   public static final boolean DEFAULT_FORCE_TIME_CHUNK_LOCK = true;
   public static final boolean DEFAULT_STORE_COMPACTION_STATE = false;
   public static final boolean DEFAULT_USE_MAX_MEMORY_ESTIMATES = false;
@@ -52,6 +53,7 @@ public class Tasks
 
   public static final String PRIORITY_KEY = "priority";
   public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
+  public static final String SUB_TASK_TIMEOUT_KEY = "subTaskTimeoutMillis";
   public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
   public static final String STORE_EMPTY_COLUMNS_KEY = "storeEmptyColumns";
   public static final String USE_SHARED_LOCK = "useSharedLock";
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
index 579c1e492c3..23e344709fe 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
@@ -29,6 +29,7 @@ import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.Tasks;
 import 
org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.MonitorEntry;
 import 
org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskCompleteEvent;
 import org.apache.druid.java.util.common.ISE;
@@ -123,7 +124,8 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType 
extends Task, SubTask
     taskMonitor = new TaskMonitor<>(
         toolbox.getOverlordClient(),
         tuningConfig.getMaxRetry(),
-        estimateTotalNumSubTasks()
+        estimateTotalNumSubTasks(),
+        getSubtaskTimeoutMillisFromContext()
     );
     TaskState state = TaskState.RUNNING;
 
@@ -472,4 +474,10 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType 
extends Task, SubTask
   {
     return nextSpecId++;
   }
+
+  private long getSubtaskTimeoutMillisFromContext()
+  {
+    return ((Number) context.getOrDefault(Tasks.SUB_TASK_TIMEOUT_KEY,
+            Tasks.DEFAULT_SUB_TASK_TIMEOUT_MILLIS)).longValue();
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
index 8db78a10b58..1fd84a36f61 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
@@ -31,6 +31,7 @@ import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Stopwatch;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.rpc.StandardRetryPolicy;
@@ -88,6 +89,7 @@ public class TaskMonitor<T extends Task, SubTaskReportType 
extends SubTaskReport
   private final OverlordClient overlordClient;
   private final int maxRetry;
   private final int estimatedNumSucceededTasks;
+  private final long taskTimeoutMillis;
 
   @GuardedBy("taskCountLock")
   private int numRunningTasks;
@@ -106,15 +108,15 @@ public class TaskMonitor<T extends Task, 
SubTaskReportType extends SubTaskReport
   @GuardedBy("startStopLock")
   private boolean running = false;
 
-  TaskMonitor(OverlordClient overlordClient, int maxRetry, int 
estimatedNumSucceededTasks)
+  TaskMonitor(OverlordClient overlordClient, int maxRetry, int 
estimatedNumSucceededTasks, long taskTimeoutMillis)
   {
     // Unlimited retries for Overlord APIs: if it goes away, we'll wait 
indefinitely for it to come back.
     this.overlordClient = Preconditions.checkNotNull(overlordClient, 
"overlordClient")
                                        
.withRetryPolicy(StandardRetryPolicy.unlimited());
     this.maxRetry = maxRetry;
     this.estimatedNumSucceededTasks = estimatedNumSucceededTasks;
-
-    log.info("TaskMonitor is initialized with estimatedNumSucceededTasks[%d]", 
estimatedNumSucceededTasks);
+    this.taskTimeoutMillis = taskTimeoutMillis;
+    log.info("TaskMonitor is initialized with estimatedNumSucceededTasks[%d] 
and sub-task timeout[%d millis].", estimatedNumSucceededTasks, 
taskTimeoutMillis);
   }
 
   public void start(long taskStatusCheckingPeriod)
@@ -134,6 +136,19 @@ public class TaskMonitor<T extends Task, SubTaskReportType 
extends SubTaskReport
                 final MonitorEntry monitorEntry = entry.getValue();
                 final String taskId = monitorEntry.runningTask.getId();
 
+                final long elapsed = 
monitorEntry.getStopwatch().millisElapsed();
+                if (taskTimeoutMillis > 0 && elapsed > taskTimeoutMillis) {
+                  log.warn("Cancelling task[%s] as it has already run for [%d] 
millis (taskTimeoutMillis=[%d]).", taskId, elapsed, taskTimeoutMillis);
+                  FutureUtils.getUnchecked(overlordClient.cancelTask(taskId), 
true);
+                  final TaskStatusPlus cancelledTaskStatus = 
FutureUtils.getUnchecked(
+                          overlordClient.taskStatus(taskId), true).getStatus();
+                  reportsMap.remove(taskId);
+                  incrementNumFailedTasks();
+                  monitorEntry.setLastStatus(cancelledTaskStatus);
+                  iterator.remove();
+                  continue;
+                }
+
                 // Could improve this by switching to the bulk taskStatuses 
API.
                 final TaskStatusResponse taskStatusResponse =
                     
FutureUtils.getUnchecked(overlordClient.taskStatus(taskId), true);
@@ -441,6 +456,7 @@ public class TaskMonitor<T extends Task, SubTaskReportType 
extends SubTaskReport
     // old tasks to recent tasks. running task is not included
     private final CopyOnWriteArrayList<TaskStatusPlus> taskHistory;
     private final SettableFuture<SubTaskCompleteEvent<T>> completeEventFuture;
+    private final Stopwatch stopwatch;
 
     /**
      * This variable is updated inside of the {@link 
java.util.concurrent.Callable} executed by
@@ -472,6 +488,7 @@ public class TaskMonitor<T extends Task, SubTaskReportType 
extends SubTaskReport
       this.runningStatus = runningStatus;
       this.taskHistory = taskHistory;
       this.completeEventFuture = completeEventFuture;
+      this.stopwatch = Stopwatch.createStarted();
     }
 
     MonitorEntry withNewRunningTask(T newTask, @Nullable TaskStatusPlus 
newStatus, TaskStatusPlus statusOfLastTask)
@@ -534,6 +551,11 @@ public class TaskMonitor<T extends Task, SubTaskReportType 
extends SubTaskReport
     {
       return taskHistory;
     }
+
+    Stopwatch getStopwatch()
+    {
+      return stopwatch;
+    }
   }
 
   static class SubTaskCompleteEvent<T extends Task>
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
index fdbb7bb0a52..4b6c64d9189 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
@@ -43,6 +43,7 @@ import org.junit.Test;
 import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -61,7 +62,8 @@ public class TaskMonitorTest
   private final TaskMonitor<TestTask, SimpleSubTaskReport> monitor = new 
TaskMonitor<>(
       new TestOverlordClient(),
       3,
-      SPLIT_NUM
+      SPLIT_NUM,
+      0
   );
 
   @Before
@@ -186,6 +188,26 @@ public class TaskMonitorTest
     }
   }
 
+  @Test
+  public void testTimeout() throws InterruptedException, ExecutionException, 
TimeoutException
+  {
+    TaskMonitor<TestTask, SimpleSubTaskReport> timeoutMonitor = new 
TaskMonitor<>(
+            new TestOverlordClient(),
+            0,
+            1,
+            10L
+    );
+    timeoutMonitor.start(50);
+    ListenableFuture<SubTaskCompleteEvent<TestTask>> future = 
timeoutMonitor.submit(
+            new TestTaskSpec("timeoutSpec", "groupId", "supervisorId", null, 
new IntegerInputSplit(0), 100L, 0, false)
+    );
+    SubTaskCompleteEvent<TestTask> result = future.get(1, TimeUnit.SECONDS);
+    Assert.assertNotNull(result.getLastStatus());
+    Assert.assertEquals(TaskState.FAILED, 
result.getLastStatus().getStatusCode());
+    Assert.assertEquals(TaskState.FAILED, result.getLastState());
+    timeoutMonitor.stop();
+  }
+
   private class TestTaskSpec extends SubTaskSpec<TestTask>
   {
     private final long runTime;
@@ -264,6 +286,8 @@ public class TaskMonitorTest
 
   private class TestOverlordClient extends NoopOverlordClient
   {
+    private final Set<String> cancelled = ConcurrentHashMap.newKeySet();
+
     @Override
     public ListenableFuture<Void> runTask(String taskId, Object taskObject)
     {
@@ -279,6 +303,9 @@ public class TaskMonitorTest
     @Override
     public ListenableFuture<TaskStatusResponse> taskStatus(String taskId)
     {
+      final TaskState state = cancelled.contains(taskId)
+              ? TaskState.FAILED
+              : tasks.get(taskId);
       final TaskStatusResponse retVal = new TaskStatusResponse(
           taskId,
           new TaskStatusPlus(
@@ -287,7 +314,7 @@ public class TaskMonitorTest
               "testTask",
               DateTimes.EPOCH,
               DateTimes.EPOCH,
-              tasks.get(taskId),
+              state,
               RunnerTaskState.RUNNING,
               -1L,
               TaskLocation.unknown(),
@@ -298,6 +325,13 @@ public class TaskMonitorTest
 
       return Futures.immediateFuture(retVal);
     }
+
+    @Override
+    public ListenableFuture<Void> cancelTask(String taskId)
+    {
+      cancelled.add(taskId);
+      return Futures.immediateFuture(null);
+    }
   }
 
   private static class IntegerInputSplit extends InputSplit<Integer>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to