This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b9c6fdc12c3 Add more concurrency unit tests for TaskQueue (#18022)
b9c6fdc12c3 is described below

commit b9c6fdc12c330c77298e2f6e915a28eb18126ff9
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed May 21 19:00:26 2025 +0530

    Add more concurrency unit tests for TaskQueue (#18022)
---
 .../overlord/TaskQueueConcurrencyTest.java         | 137 +++++++++++++++++++--
 1 file changed, 130 insertions(+), 7 deletions(-)

diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
index bcd75090f00..fc80c3f94c0 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
@@ -121,7 +121,7 @@ public class TaskQueueConcurrencyTest extends 
IngestionTestBase
     };
   }
 
-  @Test
+  @Test(timeout = 20_000L)
   public void test_start_blocks_add_forAnyTaskId()
   {
     // Add task1 to storage and mark it as running
@@ -145,7 +145,7 @@ public class TaskQueueConcurrencyTest extends 
IngestionTestBase
     );
   }
 
-  @Test
+  @Test(timeout = 20_000L)
   public void test_add_blocks_stop()
   {
     taskQueue.setActive(true);
@@ -166,7 +166,7 @@ public class TaskQueueConcurrencyTest extends 
IngestionTestBase
     );
   }
 
-  @Test
+  @Test(timeout = 20_000L)
   public void test_add_blocks_syncFromStorage_forSameTaskId()
   {
     taskQueue.setActive(true);
@@ -189,7 +189,7 @@ public class TaskQueueConcurrencyTest extends 
IngestionTestBase
     );
   }
 
-  @Test
+  @Test(timeout = 20_000L)
   public void test_syncFromStorage_blocks_add_forSameTaskId()
   {
     final String taskId = "t2";
@@ -309,6 +309,97 @@ public class TaskQueueConcurrencyTest extends 
IngestionTestBase
     );
   }
 
+  @Test(timeout = 20_000L)
+  public void test_add_doesNotBlock_add_forDifferentTaskId()
+  {
+    taskQueue.setActive(true);
+
+    final Task task1 = createTask("t1");
+    final Task task2 = createTask("t2");
+
+    ActionVerifier.verifyThat(
+        update(
+            () -> taskQueue.add(task1)
+        ).withEndState(
+            () -> Assert.assertEquals(
+                Optional.of(task1),
+                taskQueue.getActiveTask(task1.getId())
+            )
+        )
+    ).doesNotBlock(
+        update(
+            () -> taskQueue.add(task2)
+        ).withEndState(
+            () -> Assert.assertEquals(
+                Optional.of(task2),
+                taskQueue.getActiveTask(task2.getId())
+            )
+        )
+    );
+  }
+
+  @Test(timeout = 20_000L)
+  public void test_add_doesNotBlock_shutdown_forDifferentTaskId()
+  {
+    taskQueue.setActive(true);
+
+    final Task task1 = createTask("t1");
+    taskQueue.add(task1);
+
+    final Task task2 = createTask("t2");
+
+    ActionVerifier.verifyThat(
+        update(
+            () -> taskQueue.add(task2)
+        ).withEndState(
+            () -> Assert.assertEquals(
+                Optional.of(task2),
+                taskQueue.getActiveTask(task2.getId())
+            )
+        )
+    ).doesNotBlock(
+        update(
+            () -> taskQueue.shutdown(task1.getId(), "killed")
+        ).withEndState(
+            () -> Assert.assertEquals(
+                Optional.of(TaskStatus.failure(task1.getId(), "killed")),
+                taskQueue.getTaskStatus(task1.getId())
+            )
+        )
+    );
+  }
+
+  @Test(timeout = 20_000L)
+  public void test_shutdown_doesNotBlock_add_forDifferentTaskId()
+  {
+    taskQueue.setActive(true);
+
+    final Task task1 = createTask("t1");
+    taskQueue.add(task1);
+
+    final Task task2 = createTask("t2");
+
+    ActionVerifier.verifyThat(
+        update(
+            () -> taskQueue.shutdown(task1.getId(), "killed")
+        ).withEndState(
+            () -> Assert.assertEquals(
+                Optional.of(TaskStatus.failure(task1.getId(), "killed")),
+                taskQueue.getTaskStatus(task1.getId())
+            )
+        )
+    ).doesNotBlock(
+        update(
+            () -> taskQueue.add(task2)
+        ).withEndState(
+            () -> Assert.assertEquals(
+                Optional.of(task2),
+                taskQueue.getActiveTask(task2.getId())
+            )
+        )
+    );
+  }
+
   private UpdateAction update(Action action)
   {
     return new UpdateAction(action, threadToUpdateAction::put);
@@ -454,8 +545,8 @@ public class TaskQueueConcurrencyTest extends 
IngestionTestBase
     }
 
     /**
-     * Verifies that the critical update section of {@code operation1} 
completely
-     * blocks the critical update in {@code operation2}.
+     * Verifies that the critical part of {@code update1} completely blocks the
+     * critical part of {@code update2}.
      */
     void blocks(UpdateAction update2)
     {
@@ -483,18 +574,50 @@ public class TaskQueueConcurrencyTest extends 
IngestionTestBase
       waitFor(update1.critical.isReadyToFinish);
       Assert.assertEquals(1, update2.critical.isReadyToStart.getCount());
 
-      // Finish update 1 and verify that update 2 critical has now started
+      // Finish update 1 critical and verify that update 2 is now ready to 
start
       update1.critical.finish.countDown();
       waitFor(update2.critical.isReadyToStart);
 
+      // Finish update 1
       update1.waitToFinishAndVerify();
 
+      // Start and finish update2
       update2.critical.start.countDown();
       waitFor(update2.critical.isReadyToFinish);
       update2.critical.finish.countDown();
+      update2.waitToFinishAndVerify();
+
+      executor.shutdownNow();
+    }
+
+    /**
+     * Verifies that the critical part of {@code update1} does not
+     * block the critical part of {@code update2}.
+     */
+    void doesNotBlock(UpdateAction update2)
+    {
+      final ExecutorService executor = Execs.multiThreaded(2, 
"TaskQueueConcurrencyTest-%s");
+
+      // Start update 1 and wait for it to enter critical section
+      executor.submit(update1::perform);
+      waitFor(update1.critical.isReadyToStart);
+
+      // Start update2 and verify that it has also entered critical section
+      executor.submit(update2::perform);
+      waitFor(update2.critical.isReadyToStart);
 
+      // Finish update2 to prove that it is not blocked by update1
+      update2.critical.start.countDown();
+      waitFor(update2.critical.isReadyToFinish);
+      update2.critical.finish.countDown();
       update2.waitToFinishAndVerify();
 
+      // Start and finish update1
+      update1.critical.start.countDown();
+      waitFor(update1.critical.isReadyToFinish);
+      update1.critical.finish.countDown();
+      update1.waitToFinishAndVerify();
+
       executor.shutdownNow();
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to