This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b9c6fdc12c3 Add more concurrency unit tests for TaskQueue (#18022)
b9c6fdc12c3 is described below
commit b9c6fdc12c330c77298e2f6e915a28eb18126ff9
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed May 21 19:00:26 2025 +0530
Add more concurrency unit tests for TaskQueue (#18022)
---
.../overlord/TaskQueueConcurrencyTest.java | 137 +++++++++++++++++++--
1 file changed, 130 insertions(+), 7 deletions(-)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
index bcd75090f00..fc80c3f94c0 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
@@ -121,7 +121,7 @@ public class TaskQueueConcurrencyTest extends
IngestionTestBase
};
}
- @Test
+ @Test(timeout = 20_000L)
public void test_start_blocks_add_forAnyTaskId()
{
// Add task1 to storage and mark it as running
@@ -145,7 +145,7 @@ public class TaskQueueConcurrencyTest extends
IngestionTestBase
);
}
- @Test
+ @Test(timeout = 20_000L)
public void test_add_blocks_stop()
{
taskQueue.setActive(true);
@@ -166,7 +166,7 @@ public class TaskQueueConcurrencyTest extends
IngestionTestBase
);
}
- @Test
+ @Test(timeout = 20_000L)
public void test_add_blocks_syncFromStorage_forSameTaskId()
{
taskQueue.setActive(true);
@@ -189,7 +189,7 @@ public class TaskQueueConcurrencyTest extends
IngestionTestBase
);
}
- @Test
+ @Test(timeout = 20_000L)
public void test_syncFromStorage_blocks_add_forSameTaskId()
{
final String taskId = "t2";
@@ -309,6 +309,97 @@ public class TaskQueueConcurrencyTest extends
IngestionTestBase
);
}
+ @Test(timeout = 20_000L)
+ public void test_add_doesNotBlock_add_forDifferentTaskId()
+ {
+ taskQueue.setActive(true);
+
+ final Task task1 = createTask("t1");
+ final Task task2 = createTask("t2");
+
+ ActionVerifier.verifyThat(
+ update(
+ () -> taskQueue.add(task1)
+ ).withEndState(
+ () -> Assert.assertEquals(
+ Optional.of(task1),
+ taskQueue.getActiveTask(task1.getId())
+ )
+ )
+ ).doesNotBlock(
+ update(
+ () -> taskQueue.add(task2)
+ ).withEndState(
+ () -> Assert.assertEquals(
+ Optional.of(task2),
+ taskQueue.getActiveTask(task2.getId())
+ )
+ )
+ );
+ }
+
+ @Test(timeout = 20_000L)
+ public void test_add_doesNotBlock_shutdown_forDifferentTaskId()
+ {
+ taskQueue.setActive(true);
+
+ final Task task1 = createTask("t1");
+ taskQueue.add(task1);
+
+ final Task task2 = createTask("t2");
+
+ ActionVerifier.verifyThat(
+ update(
+ () -> taskQueue.add(task2)
+ ).withEndState(
+ () -> Assert.assertEquals(
+ Optional.of(task2),
+ taskQueue.getActiveTask(task2.getId())
+ )
+ )
+ ).doesNotBlock(
+ update(
+ () -> taskQueue.shutdown(task1.getId(), "killed")
+ ).withEndState(
+ () -> Assert.assertEquals(
+ Optional.of(TaskStatus.failure(task1.getId(), "killed")),
+ taskQueue.getTaskStatus(task1.getId())
+ )
+ )
+ );
+ }
+
+ @Test(timeout = 20_000L)
+ public void test_shutdown_doesNotBlock_add_forDifferentTaskId()
+ {
+ taskQueue.setActive(true);
+
+ final Task task1 = createTask("t1");
+ taskQueue.add(task1);
+
+ final Task task2 = createTask("t2");
+
+ ActionVerifier.verifyThat(
+ update(
+ () -> taskQueue.shutdown(task1.getId(), "killed")
+ ).withEndState(
+ () -> Assert.assertEquals(
+ Optional.of(TaskStatus.failure(task1.getId(), "killed")),
+ taskQueue.getTaskStatus(task1.getId())
+ )
+ )
+ ).doesNotBlock(
+ update(
+ () -> taskQueue.add(task2)
+ ).withEndState(
+ () -> Assert.assertEquals(
+ Optional.of(task2),
+ taskQueue.getActiveTask(task2.getId())
+ )
+ )
+ );
+ }
+
private UpdateAction update(Action action)
{
return new UpdateAction(action, threadToUpdateAction::put);
@@ -454,8 +545,8 @@ public class TaskQueueConcurrencyTest extends
IngestionTestBase
}
/**
- * Verifies that the critical update section of {@code operation1}
completely
- * blocks the critical update in {@code operation2}.
+ * Verifies that the critical part of {@code update1} completely blocks the
+ * critical part of {@code update2}.
*/
void blocks(UpdateAction update2)
{
@@ -483,18 +574,50 @@ public class TaskQueueConcurrencyTest extends
IngestionTestBase
waitFor(update1.critical.isReadyToFinish);
Assert.assertEquals(1, update2.critical.isReadyToStart.getCount());
- // Finish update 1 and verify that update 2 critical has now started
+ // Finish update 1 critical and verify that update 2 is now ready to
start
update1.critical.finish.countDown();
waitFor(update2.critical.isReadyToStart);
+ // Finish update 1
update1.waitToFinishAndVerify();
+ // Start and finish update2
update2.critical.start.countDown();
waitFor(update2.critical.isReadyToFinish);
update2.critical.finish.countDown();
+ update2.waitToFinishAndVerify();
+
+ executor.shutdownNow();
+ }
+
+ /**
+ * Verifies that the critical part of {@code update1} does not
+ * block the critical part of {@code update2}.
+ */
+ void doesNotBlock(UpdateAction update2)
+ {
+ final ExecutorService executor = Execs.multiThreaded(2,
"TaskQueueConcurrencyTest-%s");
+
+ // Start update 1 and wait for it to enter critical section
+ executor.submit(update1::perform);
+ waitFor(update1.critical.isReadyToStart);
+
+ // Start update2 and verify that it has also entered critical section
+ executor.submit(update2::perform);
+ waitFor(update2.critical.isReadyToStart);
+ // Finish update2 to prove that it is not blocked by update1
+ update2.critical.start.countDown();
+ waitFor(update2.critical.isReadyToFinish);
+ update2.critical.finish.countDown();
update2.waitToFinishAndVerify();
+ // Start and finish update1
+ update1.critical.start.countDown();
+ waitFor(update1.critical.isReadyToFinish);
+ update1.critical.finish.countDown();
+ update1.waitToFinishAndVerify();
+
executor.shutdownNow();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]