This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f037776fd8 MSQ: Launch initial tasks faster. (#13393)
f037776fd8 is described below
commit f037776fd8028b52ebd5ecfaf7bc336abda8ab0d
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Nov 21 05:41:18 2022 -0800
MSQ: Launch initial tasks faster. (#13393)
Notify the mainLoop thread to skip a sleep when the desired task
count changes.
---
.../apache/druid/msq/indexing/MSQWorkerTaskLauncher.java | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index 2273822402..3ca45c7419 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -93,9 +93,14 @@ public class MSQWorkerTaskLauncher
private final AtomicReference<State> state = new
AtomicReference<>(State.NEW);
private final AtomicBoolean cancelTasksOnStop = new AtomicBoolean();
+ // Set by launchTasksIfNeeded.
@GuardedBy("taskIds")
private int desiredTaskCount = 0;
+ // Set by the main loop when it acknowledges a new desiredTaskCount.
+ @GuardedBy("taskIds")
+ private int acknowledgedDesiredTaskCount = 0;
+
// Worker number -> task ID.
@GuardedBy("taskIds")
private final List<String> taskIds = new ArrayList<>();
@@ -208,6 +213,7 @@ public class MSQWorkerTaskLauncher
synchronized (taskIds) {
if (taskCount > desiredTaskCount) {
desiredTaskCount = taskCount;
+ taskIds.notifyAll();
}
while (taskIds.size() < taskCount || !IntStream.range(0,
taskCount).allMatch(fullyStartedTasks::contains)) {
@@ -325,6 +331,7 @@ public class MSQWorkerTaskLauncher
synchronized (taskIds) {
firstTask = taskIds.size();
taskCount = desiredTaskCount;
+ acknowledgedDesiredTaskCount = desiredTaskCount;
}
for (int i = firstTask; i < taskCount; i++) {
@@ -460,7 +467,11 @@ public class MSQWorkerTaskLauncher
} else {
// wait on taskIds so we can wake up early if needed.
synchronized (taskIds) {
- taskIds.wait(sleepMillis);
+ // desiredTaskCount is set by launchTasksIfNeeded, and
acknowledgedDesiredTaskCount is set by mainLoop when
+ // it acknowledges a new target. If these are not equal, do another
run immediately and launch more tasks.
+ if (acknowledgedDesiredTaskCount == desiredTaskCount) {
+ taskIds.wait(sleepMillis);
+ }
}
}
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]