This is an automated email from the ASF dual-hosted git repository.

tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ec6038f4ec6 Fix activeTask check race in TaskLockbox (#18463)
ec6038f4ec6 is described below

commit ec6038f4ec6af96220a54105ad3f5ea9575cf76a
Author: jtuglu1 <[email protected]>
AuthorDate: Mon Sep 1 23:29:46 2025 -0700

    Fix activeTask check race in TaskLockbox (#18463)
    
    Currently, we do an unprotected read to check whether a task exists in the 
activeTasks set. This can cause races with other threads calling remove() that 
lead to undesirable outcomes where task locks are allocated to killed tasks, 
blocking other compaction, kill tasks.
---
 .../druid/indexing/overlord/TaskLockbox.java       |  16 +++-
 .../overlord/TaskLockBoxConcurrencyTest.java       | 105 +++++++++++++++++++++
 2 files changed, 118 insertions(+), 3 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index c7161b75463..b36d3e96232 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -495,10 +495,9 @@ public class TaskLockbox
     final boolean isTimeChunkLock = lockGranularity == 
LockGranularity.TIME_CHUNK;
 
     final AllocationHolderList holderList = new AllocationHolderList(requests, 
interval);
-    holderList.getPending().forEach(this::verifyTaskIsActive);
-
     giant.lock();
     try {
+      holderList.getPending().forEach(this::verifyTaskIsActive);
       if (isTimeChunkLock) {
         // For time-chunk locking, segment must be allocated only after 
acquiring the lock
         holderList.getPending().forEach(holder -> acquireTaskLock(holder, 
true));
@@ -525,6 +524,7 @@ public class TaskLockbox
   /**
    * Marks the segment allocation as failed if the underlying task is not 
active.
    */
+  @GuardedBy("giant")
   private void verifyTaskIsActive(SegmentAllocationHolder holder)
   {
     final String taskId = holder.task.getId();
@@ -567,6 +567,10 @@ public class TaskLockbox
     giant.lock();
 
     try {
+      if (!activeTasks.contains(task.getId())) {
+        throw new ISE("Unable to grant LockPosse to inactive Task [%s]", 
task.getId());
+      }
+
       final TaskLockPosse posseToUse;
       final List<TaskLockPosse> foundPosses = findLockPossesOverlapsInterval(
           request.getInterval()
@@ -1386,7 +1390,13 @@ public class TaskLockbox
   @VisibleForTesting
   Set<String> getActiveTasks()
   {
-    return activeTasks;
+    giant.lock();
+    try {
+      return activeTasks;
+    }
+    finally {
+      giant.unlock();
+    }
   }
 
   @VisibleForTesting
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
index fffa90bf732..723af0e721d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
@@ -23,13 +23,18 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.common.guava.SettableSupplier;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
+import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
+import org.apache.druid.indexing.common.actions.SegmentAllocateResult;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
 import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
 import org.apache.druid.metadata.TestDerbyConnector;
@@ -50,9 +55,11 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class TaskLockBoxConcurrencyTest
 {
@@ -70,6 +77,9 @@ public class TaskLockBoxConcurrencyTest
   {
     final TestDerbyConnector derbyConnector = derby.getConnector();
     derbyConnector.createTaskTables();
+    derbyConnector.createPendingSegmentsTable();
+    derbyConnector.createUpgradeSegmentsTable();
+    derbyConnector.createSegmentTable();
     taskStorage = new MetadataTaskStorage(
         derbyConnector,
         new TaskStorageConfig(null),
@@ -268,4 +278,99 @@ public class TaskLockBoxConcurrencyTest
     Assert.assertEquals(1, future1.get().intValue());
     Assert.assertEquals(2, future2.get().intValue());
   }
+
+  @Test(timeout = 60_000L)
+  public void testConcurrentRemoveAndAllocateSegmentsForSameTask() throws 
Exception
+  {
+    final Task task = NoopTask.create();
+    final Interval interval = Intervals.of("2024-01-01/2024-01-02");
+    lockbox.add(task);
+    taskStorage.insert(task, TaskStatus.running(task.getId()));
+
+    final CyclicBarrier barrier = new CyclicBarrier(2);
+    final AtomicReference<SegmentAllocateResult> allocationResultRef = new 
AtomicReference<>();
+
+    Thread allocationThread = new Thread(() -> {
+      try {
+        barrier.await();
+        SegmentAllocateAction action = new SegmentAllocateAction(
+            task.getDataSource(),
+            interval.getStart(),
+            Granularities.DAY,
+            Granularities.DAY,
+            "sequence",
+            null,
+            false,
+            null,
+            LockGranularity.TIME_CHUNK,
+            TaskLockType.EXCLUSIVE
+        );
+        SegmentAllocateRequest request = new SegmentAllocateRequest(task, 
action, 1);
+        List<SegmentAllocateResult> results = lockbox.allocateSegments(
+            Collections.singletonList(request),
+            task.getDataSource(),
+            interval,
+            false,
+            LockGranularity.TIME_CHUNK,
+            false
+        );
+        allocationResultRef.set(results.get(0));
+      }
+      catch (Throwable t) {
+        throw new RuntimeException(t);
+      }
+    });
+
+    Thread removeThread = new Thread(() -> {
+      try {
+        barrier.await();
+        lockbox.remove(task);
+      }
+      catch (Throwable t) {
+        throw new RuntimeException(t);
+      }
+    });
+
+    allocationThread.start();
+    removeThread.start();
+    allocationThread.join();
+    removeThread.join();
+
+    final SegmentAllocateResult result = allocationResultRef.get();
+
+    // Only two valid outcomes:
+    // 1. Allocation succeeded because it won the giant lock race, and then 
remove happened after.
+    // 2. Allocation failed because remove won the giant race and marked the 
task inactive.
+    if (result.isSuccess()) {
+      // Allocation succeeded, now further allocation must fail since the task 
has been removed
+      SegmentAllocateAction action2 = new SegmentAllocateAction(
+          task.getDataSource(),
+          interval.getStart(),
+          Granularities.DAY,
+          Granularities.DAY,
+          "sequence2",
+          null,
+          false,
+          null,
+          LockGranularity.TIME_CHUNK,
+          TaskLockType.EXCLUSIVE
+      );
+      SegmentAllocateRequest request2 = new SegmentAllocateRequest(task, 
action2, 1);
+      List<SegmentAllocateResult> results2 = lockbox.allocateSegments(
+          Collections.singletonList(request2),
+          task.getDataSource(),
+          interval,
+          false,
+          LockGranularity.TIME_CHUNK,
+          false
+      );
+      SegmentAllocateResult result2 = results2.get(0);
+      Assert.assertNull("Subsequent allocation after removal must fail", 
result2.getSegmentId());
+      Assert.assertFalse(result2.isSuccess());
+      Assert.assertTrue(result2.getErrorMessage().contains("Unable to grant 
lock to inactive Task"));
+    } else {
+      Assert.assertTrue(result.getErrorMessage().contains("Unable to grant 
lock to inactive Task"));
+    }
+    Assert.assertTrue(lockbox.getAllLocks().isEmpty());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to