This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5b847c86a58 Add task context parameter `subTaskTimeoutMillis` for
index_parallel tasks (#18039)
5b847c86a58 is described below
commit 5b847c86a5820b08cd4510902bc066bb3034e8a4
Author: Misha <[email protected]>
AuthorDate: Wed Jun 4 07:52:36 2025 +0200
Add task context parameter `subTaskTimeoutMillis` for index_parallel tasks
(#18039)
Changes
---------
- Add task context parameter `subTaskTimeoutMillis`
- Update `TaskMonitor` to cancel a sub-task if it exceeds timeout
- Do not retry a sub-task that has been cancelled due to timeout
Key benefits
------------
- Fail fast: instead of waiting forever, a stuck subtask is cancelled and
the parent job errors immediately.
- Free slots: zombies don’t tie up worker slots for hours or days.
- Automate recovery: failures trigger existing retry/alert pipelines, so
ops can fix or let automation spin up
a fresh attempt.
---
docs/ingestion/tasks.md | 1 +
.../apache/druid/indexing/common/task/Tasks.java | 2 ++
.../batch/parallel/ParallelIndexPhaseRunner.java | 10 +++++-
.../common/task/batch/parallel/TaskMonitor.java | 28 ++++++++++++++--
.../task/batch/parallel/TaskMonitorTest.java | 38 ++++++++++++++++++++--
5 files changed, 73 insertions(+), 6 deletions(-)
diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md
index 9396378bcdb..2dc82b7dc49 100644
--- a/docs/ingestion/tasks.md
+++ b/docs/ingestion/tasks.md
@@ -472,6 +472,7 @@ The following parameters apply to all task types.
|`useLineageBasedSegmentAllocation`|Enables the new lineage-based segment
allocation protocol for the native Parallel task with dynamic partitioning.
This option should be off during the replacing rolling upgrade from one of the
Druid versions between 0.19 and 0.21 to Druid 0.22 or higher. Once the upgrade
is done, it must be set to `true` to ensure data correctness.|`false` in 0.21
or earlier, `true` in 0.22 or later|
|`lookupLoadingMode`|Controls the lookup loading behavior in tasks. This
property supports three values: `ALL` mode loads all the lookups, `NONE` mode
does not load any lookups and `ONLY_REQUIRED` mode loads the lookups specified
with context key `lookupsToLoad`. This property must not be specified for `MSQ`
and `kill` tasks as the task engine enforces `ONLY_REQUIRED` mode for
`MSQWorkerTask` and `NONE` mode for `MSQControllerTask` and `kill` tasks.|`ALL`|
|`lookupsToLoad`|List of lookup names to load in tasks. This property is
required only if the `lookupLoadingMode` is set to `ONLY_REQUIRED`. For
`MSQWorkerTask` type, the lookup names to load are identified by the controller
task by parsing the SQL. |`null`|
+|`subTaskTimeoutMillis`|Maximum time (in milliseconds) to wait before
cancelling a long-running sub-task. Applicable only for `index_parallel` tasks
and `compact` tasks (when running in parallel mode). Set to 0 for no timeout
(infinite).|0 (unlimited)|
## Task logs
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
index 8e62113f0c7..90fb67116b9 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
@@ -44,6 +44,7 @@ public class Tasks
public static final int DEFAULT_TASK_PRIORITY = 0;
public static final long DEFAULT_LOCK_TIMEOUT_MILLIS =
TimeUnit.MINUTES.toMillis(5);
+ public static final long DEFAULT_SUB_TASK_TIMEOUT_MILLIS = 0;
public static final boolean DEFAULT_FORCE_TIME_CHUNK_LOCK = true;
public static final boolean DEFAULT_STORE_COMPACTION_STATE = false;
public static final boolean DEFAULT_USE_MAX_MEMORY_ESTIMATES = false;
@@ -52,6 +53,7 @@ public class Tasks
public static final String PRIORITY_KEY = "priority";
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
+ public static final String SUB_TASK_TIMEOUT_KEY = "subTaskTimeoutMillis";
public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
public static final String STORE_EMPTY_COLUMNS_KEY = "storeEmptyColumns";
public static final String USE_SHARED_LOCK = "useSharedLock";
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
index 579c1e492c3..23e344709fe 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
@@ -29,6 +29,7 @@ import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.Tasks;
import
org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.MonitorEntry;
import
org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskCompleteEvent;
import org.apache.druid.java.util.common.ISE;
@@ -123,7 +124,8 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType
extends Task, SubTask
taskMonitor = new TaskMonitor<>(
toolbox.getOverlordClient(),
tuningConfig.getMaxRetry(),
- estimateTotalNumSubTasks()
+ estimateTotalNumSubTasks(),
+ getSubtaskTimeoutMillisFromContext()
);
TaskState state = TaskState.RUNNING;
@@ -472,4 +474,10 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType
extends Task, SubTask
{
return nextSpecId++;
}
+
+ private long getSubtaskTimeoutMillisFromContext()
+ {
+ return ((Number) context.getOrDefault(Tasks.SUB_TASK_TIMEOUT_KEY,
+ Tasks.DEFAULT_SUB_TASK_TIMEOUT_MILLIS)).longValue();
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
index 8db78a10b58..1fd84a36f61 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
@@ -31,6 +31,7 @@ import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.rpc.StandardRetryPolicy;
@@ -88,6 +89,7 @@ public class TaskMonitor<T extends Task, SubTaskReportType
extends SubTaskReport
private final OverlordClient overlordClient;
private final int maxRetry;
private final int estimatedNumSucceededTasks;
+ private final long taskTimeoutMillis;
@GuardedBy("taskCountLock")
private int numRunningTasks;
@@ -106,15 +108,15 @@ public class TaskMonitor<T extends Task,
SubTaskReportType extends SubTaskReport
@GuardedBy("startStopLock")
private boolean running = false;
- TaskMonitor(OverlordClient overlordClient, int maxRetry, int
estimatedNumSucceededTasks)
+ TaskMonitor(OverlordClient overlordClient, int maxRetry, int
estimatedNumSucceededTasks, long taskTimeoutMillis)
{
// Unlimited retries for Overlord APIs: if it goes away, we'll wait
indefinitely for it to come back.
this.overlordClient = Preconditions.checkNotNull(overlordClient,
"overlordClient")
.withRetryPolicy(StandardRetryPolicy.unlimited());
this.maxRetry = maxRetry;
this.estimatedNumSucceededTasks = estimatedNumSucceededTasks;
-
- log.info("TaskMonitor is initialized with estimatedNumSucceededTasks[%d]",
estimatedNumSucceededTasks);
+ this.taskTimeoutMillis = taskTimeoutMillis;
+ log.info("TaskMonitor is initialized with estimatedNumSucceededTasks[%d]
and sub-task timeout[%d millis].", estimatedNumSucceededTasks,
taskTimeoutMillis);
}
public void start(long taskStatusCheckingPeriod)
@@ -134,6 +136,19 @@ public class TaskMonitor<T extends Task, SubTaskReportType
extends SubTaskReport
final MonitorEntry monitorEntry = entry.getValue();
final String taskId = monitorEntry.runningTask.getId();
+ final long elapsed =
monitorEntry.getStopwatch().millisElapsed();
+ if (taskTimeoutMillis > 0 && elapsed > taskTimeoutMillis) {
+ log.warn("Cancelling task[%s] as it has already run for [%d]
millis (taskTimeoutMillis=[%d]).", taskId, elapsed, taskTimeoutMillis);
+ FutureUtils.getUnchecked(overlordClient.cancelTask(taskId),
true);
+ final TaskStatusPlus cancelledTaskStatus =
FutureUtils.getUnchecked(
+ overlordClient.taskStatus(taskId), true).getStatus();
+ reportsMap.remove(taskId);
+ incrementNumFailedTasks();
+ monitorEntry.setLastStatus(cancelledTaskStatus);
+ iterator.remove();
+ continue;
+ }
+
// Could improve this by switching to the bulk taskStatuses
API.
final TaskStatusResponse taskStatusResponse =
FutureUtils.getUnchecked(overlordClient.taskStatus(taskId), true);
@@ -441,6 +456,7 @@ public class TaskMonitor<T extends Task, SubTaskReportType
extends SubTaskReport
// old tasks to recent tasks. running task is not included
private final CopyOnWriteArrayList<TaskStatusPlus> taskHistory;
private final SettableFuture<SubTaskCompleteEvent<T>> completeEventFuture;
+ private final Stopwatch stopwatch;
/**
* This variable is updated inside of the {@link
java.util.concurrent.Callable} executed by
@@ -472,6 +488,7 @@ public class TaskMonitor<T extends Task, SubTaskReportType
extends SubTaskReport
this.runningStatus = runningStatus;
this.taskHistory = taskHistory;
this.completeEventFuture = completeEventFuture;
+ this.stopwatch = Stopwatch.createStarted();
}
MonitorEntry withNewRunningTask(T newTask, @Nullable TaskStatusPlus
newStatus, TaskStatusPlus statusOfLastTask)
@@ -534,6 +551,11 @@ public class TaskMonitor<T extends Task, SubTaskReportType
extends SubTaskReport
{
return taskHistory;
}
+
+ Stopwatch getStopwatch()
+ {
+ return stopwatch;
+ }
}
static class SubTaskCompleteEvent<T extends Task>
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
index fdbb7bb0a52..4b6c64d9189 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
@@ -43,6 +43,7 @@ import org.junit.Test;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -61,7 +62,8 @@ public class TaskMonitorTest
private final TaskMonitor<TestTask, SimpleSubTaskReport> monitor = new
TaskMonitor<>(
new TestOverlordClient(),
3,
- SPLIT_NUM
+ SPLIT_NUM,
+ 0
);
@Before
@@ -186,6 +188,26 @@ public class TaskMonitorTest
}
}
+ @Test
+ public void testTimeout() throws InterruptedException, ExecutionException,
TimeoutException
+ {
+ TaskMonitor<TestTask, SimpleSubTaskReport> timeoutMonitor = new
TaskMonitor<>(
+ new TestOverlordClient(),
+ 0,
+ 1,
+ 10L
+ );
+ timeoutMonitor.start(50);
+ ListenableFuture<SubTaskCompleteEvent<TestTask>> future =
timeoutMonitor.submit(
+ new TestTaskSpec("timeoutSpec", "groupId", "supervisorId", null,
new IntegerInputSplit(0), 100L, 0, false)
+ );
+ SubTaskCompleteEvent<TestTask> result = future.get(1, TimeUnit.SECONDS);
+ Assert.assertNotNull(result.getLastStatus());
+ Assert.assertEquals(TaskState.FAILED,
result.getLastStatus().getStatusCode());
+ Assert.assertEquals(TaskState.FAILED, result.getLastState());
+ timeoutMonitor.stop();
+ }
+
private class TestTaskSpec extends SubTaskSpec<TestTask>
{
private final long runTime;
@@ -264,6 +286,8 @@ public class TaskMonitorTest
private class TestOverlordClient extends NoopOverlordClient
{
+ private final Set<String> cancelled = ConcurrentHashMap.newKeySet();
+
@Override
public ListenableFuture<Void> runTask(String taskId, Object taskObject)
{
@@ -279,6 +303,9 @@ public class TaskMonitorTest
@Override
public ListenableFuture<TaskStatusResponse> taskStatus(String taskId)
{
+ final TaskState state = cancelled.contains(taskId)
+ ? TaskState.FAILED
+ : tasks.get(taskId);
final TaskStatusResponse retVal = new TaskStatusResponse(
taskId,
new TaskStatusPlus(
@@ -287,7 +314,7 @@ public class TaskMonitorTest
"testTask",
DateTimes.EPOCH,
DateTimes.EPOCH,
- tasks.get(taskId),
+ state,
RunnerTaskState.RUNNING,
-1L,
TaskLocation.unknown(),
@@ -298,6 +325,13 @@ public class TaskMonitorTest
return Futures.immediateFuture(retVal);
}
+
+ @Override
+ public ListenableFuture<Void> cancelTask(String taskId)
+ {
+ cancelled.add(taskId);
+ return Futures.immediateFuture(null);
+ }
}
private static class IntegerInputSplit extends InputSplit<Integer>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]