This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2b1f1fc61dc Fix race condition in TaskQueue.removeTaskInternal (#18031)
2b1f1fc61dc is described below
commit 2b1f1fc61dc52bbf174f66da0d994e409a88a3a7
Author: jtuglu-netflix <[email protected]>
AuthorDate: Sat May 24 01:17:42 2025 -0700
Fix race condition in TaskQueue.removeTaskInternal (#18031)
Changes:
- Fix race condition in TaskQueue.removeTaskInternal
- syncFromStorage should remove task even if it has a more recent update
time but is already complete
- Exclude complete tasks from waiting task count
---
.../apache/druid/indexing/overlord/TaskQueue.java | 18 ++++--
.../overlord/TaskQueueConcurrencyTest.java | 72 ++++++++++++++++++++++
2 files changed, 84 insertions(+), 6 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 517835c0090..50eb61d2866 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -577,15 +577,15 @@ public class TaskQueue
* might add it back to the queue, thus causing a duplicate run of the task.
*/
@GuardedBy("startStopLock")
- private void removeTaskInternal(final String taskId, final DateTime
deleteTime)
+ private boolean removeTaskInternal(final String taskId, final DateTime
deleteTime)
{
final AtomicReference<Task> removedTask = new AtomicReference<>();
addOrUpdateTaskEntry(
taskId,
prevEntry -> {
- // Remove the task only if it doesn't have a more recent update
- if (prevEntry != null &&
prevEntry.lastUpdatedTime.isBefore(deleteTime)) {
+ // Remove the task only if it is complete OR it doesn't have a more
recent update
+ if (prevEntry != null && (prevEntry.isComplete ||
prevEntry.lastUpdatedTime.isBefore(deleteTime))) {
removedTask.set(prevEntry.task);
// Remove this taskId from activeTasks by mapping it to null
return null;
@@ -597,7 +597,9 @@ public class TaskQueue
if (removedTask.get() != null) {
removeTaskLock(removedTask.get());
+ return true;
}
+ return false;
}
/**
@@ -851,8 +853,11 @@ public class TaskQueue
final Collection<Task> removedTasks =
mapDifference.entriesOnlyOnLeft().values();
// Remove tasks not present in metadata store if their lastUpdatedTime
is before syncStartTime
+ int numTasksRemoved = 0;
for (Task task : removedTasks) {
- removeTaskInternal(task.getId(), syncStartTime);
+ if (removeTaskInternal(task.getId(), syncStartTime)) {
+ ++numTasksRemoved;
+ }
}
// Add new tasks present in metadata store if their lastUpdatedTime is
before syncStartTime
@@ -861,8 +866,8 @@ public class TaskQueue
}
log.info(
- "Synced [%d] tasks from storage (%d tasks added, %d tasks
removed).",
- newTasks.size(), addedTasks.size(), removedTasks.size()
+ "Synced [%d] tasks from storage (%d tasks added, %d tasks
removable, %d tasks removed).",
+ newTasks.size(), addedTasks.size(), removedTasks.size(),
numTasksRemoved
);
requestManagement();
} else {
@@ -958,6 +963,7 @@ public class TaskQueue
.collect(Collectors.toSet());
return activeTasks.values().stream()
+ .filter(entry -> !entry.isComplete)
.map(entry -> entry.task)
.filter(task ->
!runnerKnownTaskIds.contains(task.getId()))
.collect(Collectors.toMap(Task::getDataSource, task ->
1L, Long::sum));
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
index fc80c3f94c0..444b468b042 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
@@ -400,6 +400,78 @@ public class TaskQueueConcurrencyTest extends
IngestionTestBase
);
}
+ @Test(timeout = 20_000L)
+ public void
test_shutdown_then_manageQueuedTasks_blocks_syncFromStorage_and_forcesTaskRemoval()
+ {
+ taskQueue.setActive(true);
+
+ final Task task1 = createTask("t1");
+ taskQueue.add(task1);
+
+ // shutdown the task ahead of time to mark it as isComplete
+ taskQueue.shutdown(task1.getId(), "shutdown");
+
+ // verify that managedQueuedTasks() called before syncFromStorage() forces
the sync to block
+ // but ensures that syncFromStorage() is able to remove the task
+ ActionVerifier.verifyThat(
+ update(
+ () -> taskQueue.manageQueuedTasks()
+ ).withEndState(
+ () -> Assert.assertEquals(
+ Optional.of(TaskStatus.failure(task1.getId(), "shutdown")),
+ taskQueue.getTaskStatus(task1.getId())
+ )
+ )
+ ).blocks(
+ update(
+ () -> taskQueue.syncFromStorage()
+ ).withEndState(
+ () -> Assert.assertEquals(
+ Optional.absent(),
+ taskQueue.getActiveTask(task1.getId())
+ )
+ )
+ );
+
+ Assert.assertEquals(Optional.absent(),
taskQueue.getActiveTask(task1.getId()));
+ }
+
+ @Test(timeout = 20_000L)
+ public void
test_shutdown_then_syncFromStorage_blocks_manageQueuedTasks_and_forcesTaskRemoval()
+ {
+ taskQueue.setActive(true);
+
+ final Task task1 = createTask("t1");
+ taskQueue.add(task1);
+
+ // shutdown the task ahead of time to mark it as isComplete
+ taskQueue.shutdown(task1.getId(), "shutdown");
+
+ // verify that syncFromStorage() called before managedQueuedTasks() forces
the sync to block
+ // but ensures that syncFromStorage() is able to remove the task
+ ActionVerifier.verifyThat(
+ update(
+ () -> taskQueue.syncFromStorage()
+ ).withEndState(
+ () -> Assert.assertEquals(
+ Optional.absent(),
+ taskQueue.getActiveTask(task1.getId())
+ )
+ )
+ ).blocks(
+ update(
+ () -> taskQueue.manageQueuedTasks()
+ ).withEndState(
+ () -> Assert.assertEquals(
+ Optional.absent(),
+ taskQueue.getActiveTask(task1.getId())
+ )
+ )
+ );
+
+ Assert.assertEquals(Optional.absent(),
taskQueue.getActiveTask(task1.getId()));
+ }
+
private UpdateAction update(Action action)
{
return new UpdateAction(action, threadToUpdateAction::put);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]