This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b1f1fc61dc Fix race condition in TaskQueue.removeTaskInternal (#18031)
2b1f1fc61dc is described below

commit 2b1f1fc61dc52bbf174f66da0d994e409a88a3a7
Author: jtuglu-netflix <[email protected]>
AuthorDate: Sat May 24 01:17:42 2025 -0700

    Fix race condition in TaskQueue.removeTaskInternal (#18031)
    
    Changes:
    - Fix race condition in TaskQueue.removeTaskInternal
    - syncFromStorage should remove task even if it has a more recent update 
time but is already complete
    - Exclude complete tasks from waiting task count
---
 .../apache/druid/indexing/overlord/TaskQueue.java  | 18 ++++--
 .../overlord/TaskQueueConcurrencyTest.java         | 72 ++++++++++++++++++++++
 2 files changed, 84 insertions(+), 6 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 517835c0090..50eb61d2866 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -577,15 +577,15 @@ public class TaskQueue
    * might add it back to the queue, thus causing a duplicate run of the task.
    */
   @GuardedBy("startStopLock")
-  private void removeTaskInternal(final String taskId, final DateTime 
deleteTime)
+  private boolean removeTaskInternal(final String taskId, final DateTime 
deleteTime)
   {
     final AtomicReference<Task> removedTask = new AtomicReference<>();
 
     addOrUpdateTaskEntry(
         taskId,
         prevEntry -> {
-          // Remove the task only if it doesn't have a more recent update
-          if (prevEntry != null && 
prevEntry.lastUpdatedTime.isBefore(deleteTime)) {
+          // Remove the task only if it is complete OR it doesn't have a more 
recent update
+          if (prevEntry != null && (prevEntry.isComplete || 
prevEntry.lastUpdatedTime.isBefore(deleteTime))) {
             removedTask.set(prevEntry.task);
             // Remove this taskId from activeTasks by mapping it to null
             return null;
@@ -597,7 +597,9 @@ public class TaskQueue
 
     if (removedTask.get() != null) {
       removeTaskLock(removedTask.get());
+      return true;
     }
+    return false;
   }
 
   /**
@@ -851,8 +853,11 @@ public class TaskQueue
         final Collection<Task> removedTasks = 
mapDifference.entriesOnlyOnLeft().values();
 
         // Remove tasks not present in metadata store if their lastUpdatedTime 
is before syncStartTime
+        int numTasksRemoved = 0;
         for (Task task : removedTasks) {
-          removeTaskInternal(task.getId(), syncStartTime);
+          if (removeTaskInternal(task.getId(), syncStartTime)) {
+            ++numTasksRemoved;
+          }
         }
 
         // Add new tasks present in metadata store if their lastUpdatedTime is 
before syncStartTime
@@ -861,8 +866,8 @@ public class TaskQueue
         }
 
         log.info(
-            "Synced [%d] tasks from storage (%d tasks added, %d tasks 
removed).",
-            newTasks.size(), addedTasks.size(), removedTasks.size()
+            "Synced [%d] tasks from storage (%d tasks added, %d tasks 
removable, %d tasks removed).",
+            newTasks.size(), addedTasks.size(), removedTasks.size(), 
numTasksRemoved
         );
         requestManagement();
       } else {
@@ -958,6 +963,7 @@ public class TaskQueue
                                                .collect(Collectors.toSet());
 
     return activeTasks.values().stream()
+                      .filter(entry -> !entry.isComplete)
                       .map(entry -> entry.task)
                       .filter(task -> 
!runnerKnownTaskIds.contains(task.getId()))
                       .collect(Collectors.toMap(Task::getDataSource, task -> 
1L, Long::sum));
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
index fc80c3f94c0..444b468b042 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
@@ -400,6 +400,78 @@ public class TaskQueueConcurrencyTest extends 
IngestionTestBase
     );
   }
 
+  @Test(timeout = 20_000L)
+  public void 
test_shutdown_then_manageQueuedTasks_blocks_syncFromStorage_and_forcesTaskRemoval()
+  {
+    taskQueue.setActive(true);
+
+    final Task task1 = createTask("t1");
+    taskQueue.add(task1);
+
+    // shutdown the task ahead of time to mark it as isComplete
+    taskQueue.shutdown(task1.getId(), "shutdown");
+
+    // verify that managedQueuedTasks() called before syncFromStorage() forces 
the sync to block
+    // but ensures that syncFromStorage() is able to remove the task
+    ActionVerifier.verifyThat(
+        update(
+            () -> taskQueue.manageQueuedTasks()
+        ).withEndState(
+            () -> Assert.assertEquals(
+                Optional.of(TaskStatus.failure(task1.getId(), "shutdown")),
+                taskQueue.getTaskStatus(task1.getId())
+            )
+        )
+    ).blocks(
+        update(
+            () -> taskQueue.syncFromStorage()
+        ).withEndState(
+            () -> Assert.assertEquals(
+                Optional.absent(),
+                taskQueue.getActiveTask(task1.getId())
+            )
+        )
+    );
+
+    Assert.assertEquals(Optional.absent(), 
taskQueue.getActiveTask(task1.getId()));
+  }
+
+  @Test(timeout = 20_000L)
+  public void 
test_shutdown_then_syncFromStorage_blocks_manageQueuedTasks_and_forcesTaskRemoval()
+  {
+    taskQueue.setActive(true);
+
+    final Task task1 = createTask("t1");
+    taskQueue.add(task1);
+
+    // shutdown the task ahead of time to mark it as isComplete
+    taskQueue.shutdown(task1.getId(), "shutdown");
+
+    // verify that syncFromStorage() called before managedQueuedTasks() forces 
the sync to block
+    // but ensures that syncFromStorage() is able to remove the task
+    ActionVerifier.verifyThat(
+        update(
+            () -> taskQueue.syncFromStorage()
+        ).withEndState(
+            () -> Assert.assertEquals(
+                Optional.absent(),
+                taskQueue.getActiveTask(task1.getId())
+            )
+        )
+    ).blocks(
+        update(
+            () -> taskQueue.manageQueuedTasks()
+        ).withEndState(
+            () -> Assert.assertEquals(
+                Optional.absent(),
+                taskQueue.getActiveTask(task1.getId())
+            )
+        )
+    );
+
+    Assert.assertEquals(Optional.absent(), 
taskQueue.getActiveTask(task1.getId()));
+  }
+
   private UpdateAction update(Action action)
   {
     return new UpdateAction(action, threadToUpdateAction::put);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to