This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f037776fd8 MSQ: Launch initial tasks faster. (#13393)
f037776fd8 is described below

commit f037776fd8028b52ebd5ecfaf7bc336abda8ab0d
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Nov 21 05:41:18 2022 -0800

    MSQ: Launch initial tasks faster. (#13393)
    
    Notify the mainLoop thread to skip a sleep when the desired task
    count changes.
---
 .../apache/druid/msq/indexing/MSQWorkerTaskLauncher.java    | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index 2273822402..3ca45c7419 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -93,9 +93,14 @@ public class MSQWorkerTaskLauncher
   private final AtomicReference<State> state = new 
AtomicReference<>(State.NEW);
   private final AtomicBoolean cancelTasksOnStop = new AtomicBoolean();
 
+  // Set by launchTasksIfNeeded.
   @GuardedBy("taskIds")
   private int desiredTaskCount = 0;
 
+  // Set by the main loop when it acknowledges a new desiredTaskCount.
+  @GuardedBy("taskIds")
+  private int acknowledgedDesiredTaskCount = 0;
+
   // Worker number -> task ID.
   @GuardedBy("taskIds")
   private final List<String> taskIds = new ArrayList<>();
@@ -208,6 +213,7 @@ public class MSQWorkerTaskLauncher
     synchronized (taskIds) {
       if (taskCount > desiredTaskCount) {
         desiredTaskCount = taskCount;
+        taskIds.notifyAll();
       }
 
       while (taskIds.size() < taskCount || !IntStream.range(0, 
taskCount).allMatch(fullyStartedTasks::contains)) {
@@ -325,6 +331,7 @@ public class MSQWorkerTaskLauncher
     synchronized (taskIds) {
       firstTask = taskIds.size();
       taskCount = desiredTaskCount;
+      acknowledgedDesiredTaskCount = desiredTaskCount;
     }
 
     for (int i = firstTask; i < taskCount; i++) {
@@ -460,7 +467,11 @@ public class MSQWorkerTaskLauncher
       } else {
         // wait on taskIds so we can wake up early if needed.
         synchronized (taskIds) {
-          taskIds.wait(sleepMillis);
+          // desiredTaskCount is set by launchTasksIfNeeded, and 
acknowledgedDesiredTaskCount is set by mainLoop when
+          // it acknowledges a new target. If these are not equal, do another 
run immediately and launch more tasks.
+          if (acknowledgedDesiredTaskCount == desiredTaskCount) {
+            taskIds.wait(sleepMillis);
+          }
         }
       }
     } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to