This is an automated email from the ASF dual-hosted git repository.
tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ec6038f4ec6 Fix activeTask check race in TaskLockbox (#18463)
ec6038f4ec6 is described below
commit ec6038f4ec6af96220a54105ad3f5ea9575cf76a
Author: jtuglu1 <[email protected]>
AuthorDate: Mon Sep 1 23:29:46 2025 -0700
Fix activeTask check race in TaskLockbox (#18463)
Currently, we do an unprotected read to check whether a task exists in the
activeTasks set. This can cause races with other threads calling remove() that
lead to undesirable outcomes where task locks are allocated to killed tasks,
blocking other compaction, kill tasks.
---
.../druid/indexing/overlord/TaskLockbox.java | 16 +++-
.../overlord/TaskLockBoxConcurrencyTest.java | 105 +++++++++++++++++++++
2 files changed, 118 insertions(+), 3 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index c7161b75463..b36d3e96232 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -495,10 +495,9 @@ public class TaskLockbox
final boolean isTimeChunkLock = lockGranularity ==
LockGranularity.TIME_CHUNK;
final AllocationHolderList holderList = new AllocationHolderList(requests,
interval);
- holderList.getPending().forEach(this::verifyTaskIsActive);
-
giant.lock();
try {
+ holderList.getPending().forEach(this::verifyTaskIsActive);
if (isTimeChunkLock) {
// For time-chunk locking, segment must be allocated only after
acquiring the lock
holderList.getPending().forEach(holder -> acquireTaskLock(holder,
true));
@@ -525,6 +524,7 @@ public class TaskLockbox
/**
* Marks the segment allocation as failed if the underlying task is not
active.
*/
+ @GuardedBy("giant")
private void verifyTaskIsActive(SegmentAllocationHolder holder)
{
final String taskId = holder.task.getId();
@@ -567,6 +567,10 @@ public class TaskLockbox
giant.lock();
try {
+ if (!activeTasks.contains(task.getId())) {
+ throw new ISE("Unable to grant LockPosse to inactive Task [%s]",
task.getId());
+ }
+
final TaskLockPosse posseToUse;
final List<TaskLockPosse> foundPosses = findLockPossesOverlapsInterval(
request.getInterval()
@@ -1386,7 +1390,13 @@ public class TaskLockbox
@VisibleForTesting
Set<String> getActiveTasks()
{
- return activeTasks;
+ giant.lock();
+ try {
+ return activeTasks;
+ }
+ finally {
+ giant.unlock();
+ }
}
@VisibleForTesting
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
index fffa90bf732..723af0e721d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java
@@ -23,13 +23,18 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
+import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
+import org.apache.druid.indexing.common.actions.SegmentAllocateResult;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
@@ -50,9 +55,11 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
public class TaskLockBoxConcurrencyTest
{
@@ -70,6 +77,9 @@ public class TaskLockBoxConcurrencyTest
{
final TestDerbyConnector derbyConnector = derby.getConnector();
derbyConnector.createTaskTables();
+ derbyConnector.createPendingSegmentsTable();
+ derbyConnector.createUpgradeSegmentsTable();
+ derbyConnector.createSegmentTable();
taskStorage = new MetadataTaskStorage(
derbyConnector,
new TaskStorageConfig(null),
@@ -268,4 +278,99 @@ public class TaskLockBoxConcurrencyTest
Assert.assertEquals(1, future1.get().intValue());
Assert.assertEquals(2, future2.get().intValue());
}
+
+ @Test(timeout = 60_000L)
+ public void testConcurrentRemoveAndAllocateSegmentsForSameTask() throws
Exception
+ {
+ final Task task = NoopTask.create();
+ final Interval interval = Intervals.of("2024-01-01/2024-01-02");
+ lockbox.add(task);
+ taskStorage.insert(task, TaskStatus.running(task.getId()));
+
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ final AtomicReference<SegmentAllocateResult> allocationResultRef = new
AtomicReference<>();
+
+ Thread allocationThread = new Thread(() -> {
+ try {
+ barrier.await();
+ SegmentAllocateAction action = new SegmentAllocateAction(
+ task.getDataSource(),
+ interval.getStart(),
+ Granularities.DAY,
+ Granularities.DAY,
+ "sequence",
+ null,
+ false,
+ null,
+ LockGranularity.TIME_CHUNK,
+ TaskLockType.EXCLUSIVE
+ );
+ SegmentAllocateRequest request = new SegmentAllocateRequest(task,
action, 1);
+ List<SegmentAllocateResult> results = lockbox.allocateSegments(
+ Collections.singletonList(request),
+ task.getDataSource(),
+ interval,
+ false,
+ LockGranularity.TIME_CHUNK,
+ false
+ );
+ allocationResultRef.set(results.get(0));
+ }
+ catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ });
+
+ Thread removeThread = new Thread(() -> {
+ try {
+ barrier.await();
+ lockbox.remove(task);
+ }
+ catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ });
+
+ allocationThread.start();
+ removeThread.start();
+ allocationThread.join();
+ removeThread.join();
+
+ final SegmentAllocateResult result = allocationResultRef.get();
+
+ // Only two valid outcomes:
+ // 1. Allocation succeeded because it won the giant lock race, and then
remove happened after.
+ // 2. Allocation failed because remove won the giant race and marked the
task inactive.
+ if (result.isSuccess()) {
+ // Allocation succeeded, now further allocation must fail since the task
has been removed
+ SegmentAllocateAction action2 = new SegmentAllocateAction(
+ task.getDataSource(),
+ interval.getStart(),
+ Granularities.DAY,
+ Granularities.DAY,
+ "sequence2",
+ null,
+ false,
+ null,
+ LockGranularity.TIME_CHUNK,
+ TaskLockType.EXCLUSIVE
+ );
+ SegmentAllocateRequest request2 = new SegmentAllocateRequest(task,
action2, 1);
+ List<SegmentAllocateResult> results2 = lockbox.allocateSegments(
+ Collections.singletonList(request2),
+ task.getDataSource(),
+ interval,
+ false,
+ LockGranularity.TIME_CHUNK,
+ false
+ );
+ SegmentAllocateResult result2 = results2.get(0);
+ Assert.assertNull("Subsequent allocation after removal must fail",
result2.getSegmentId());
+ Assert.assertFalse(result2.isSuccess());
+ Assert.assertTrue(result2.getErrorMessage().contains("Unable to grant
lock to inactive Task"));
+ } else {
+ Assert.assertTrue(result.getErrorMessage().contains("Unable to grant
lock to inactive Task"));
+ }
+ Assert.assertTrue(lockbox.getAllLocks().isEmpty());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]