This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch 31.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/31.0.0 by this push:
new 1435b9f4bdc Dart: Skip final getCounters, postFinish to idle
historicals. (#17255) (#17259)
1435b9f4bdc is described below
commit 1435b9f4bdcfee19e7b4268ae33ec53462da8e17
Author: Kashif Faraz <[email protected]>
AuthorDate: Sat Oct 5 19:32:21 2024 +0530
Dart: Skip final getCounters, postFinish to idle historicals. (#17255)
(#17259)
In a Dart query, all Historicals are given worker IDs, but not all of them
are going to actually be started or receive work orders.
Attempting to send a getCounters or postFinish command to a worker that
never received a work order is not only wasteful, but it causes errors due
to the workers not knowing about that query ID.
Co-authored-by: Gian Merlino <[email protected]>
---
.../org/apache/druid/msq/exec/ControllerImpl.java | 20 +++++++++++---------
.../msq/kernel/controller/ControllerQueryKernel.java | 15 +++++++++++++++
.../kernel/controller/ControllerStageTracker.java | 14 ++++++--------
3 files changed, 32 insertions(+), 17 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 60e0910e15b..936ba4af44d 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -432,8 +432,10 @@ public class ControllerImpl implements Controller
}
}
if (queryKernel != null && queryKernel.isSuccess()) {
- // If successful, encourage the tasks to exit successfully.
- postFinishToAllTasks();
+ // If successful, encourage workers to exit successfully.
+ // Only send this command to participating workers. For task-based
queries, this is all tasks, since tasks
+ // are launched only when needed. For Dart, this is any servers that
were actually assigned work items.
+ postFinishToWorkers(queryKernel.getAllParticipatingWorkers());
workerManager.stop(false);
} else {
// If not successful, cancel running tasks.
@@ -1456,15 +1458,15 @@ public class ControllerImpl implements Controller
return IntervalUtils.difference(replaceIntervals, publishIntervals);
}
- private CounterSnapshotsTree getCountersFromAllTasks()
+ private CounterSnapshotsTree fetchCountersFromWorkers(final IntSet workers)
{
final CounterSnapshotsTree retVal = new CounterSnapshotsTree();
final List<String> taskList = getWorkerIds();
final List<ListenableFuture<CounterSnapshotsTree>> futures = new
ArrayList<>();
- for (String taskId : taskList) {
- futures.add(netClient.getCounters(taskId));
+ for (int workerNumber : workers) {
+ futures.add(netClient.getCounters(taskList.get(workerNumber)));
}
final List<CounterSnapshotsTree> snapshotsTrees =
@@ -1477,14 +1479,14 @@ public class ControllerImpl implements Controller
return retVal;
}
- private void postFinishToAllTasks()
+ private void postFinishToWorkers(final IntSet workers)
{
final List<String> taskList = getWorkerIds();
final List<ListenableFuture<Void>> futures = new ArrayList<>();
- for (String taskId : taskList) {
- futures.add(netClient.postFinish(taskId));
+ for (int workerNumber : workers) {
+ futures.add(netClient.postFinish(taskList.get(workerNumber)));
}
FutureUtils.getUnchecked(MSQFutureUtils.allAsList(futures, true), true);
@@ -1499,7 +1501,7 @@ public class ControllerImpl implements Controller
private CounterSnapshotsTree getFinalCountersSnapshot(@Nullable final
ControllerQueryKernel queryKernel)
{
if (queryKernel != null && queryKernel.isSuccess()) {
- return getCountersFromAllTasks();
+ return
fetchCountersFromWorkers(queryKernel.getAllParticipatingWorkers());
} else {
return makeCountersSnapshotForLiveReports();
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
index 16ed68211d5..b0200234b40 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
@@ -27,6 +27,7 @@ import it.unimi.dsi.fastutil.ints.Int2IntAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.frame.key.ClusterByPartitions;
import org.apache.druid.java.util.common.IAE;
@@ -643,6 +644,20 @@ public class ControllerQueryKernel
doWithStageTracker(stageId, ControllerStageTracker::fail);
}
+ /**
+ * Returns the set of all worker numbers that have participated in work done
so far by this query.
+ */
+ public IntSet getAllParticipatingWorkers()
+ {
+ final IntSet retVal = new IntAVLTreeSet();
+
+ for (final ControllerStageTracker tracker : stageTrackers.values()) {
+ retVal.addAll(tracker.getWorkerInputs().workers());
+ }
+
+ return retVal;
+ }
+
/**
* Fetches and returns the stage kernel corresponding to the provided stage
id, else throws {@link IAE}
*/
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
index 0a62ba24b63..338a35e0d24 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
@@ -63,7 +63,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-import java.util.stream.IntStream;
/**
* Controller-side state machine for each stage. Used by {@link
ControllerQueryKernel} to form the overall state
@@ -137,7 +136,7 @@ class ControllerStageTracker
this.workerInputs = workerInputs;
this.maxRetainedPartitionSketchBytes = maxRetainedPartitionSketchBytes;
- initializeWorkerState(workerCount);
+ initializeWorkerState(workerInputs.workers());
if (stageDef.mustGatherResultKeyStatistics()) {
this.completeKeyStatisticsInformation =
@@ -149,14 +148,13 @@ class ControllerStageTracker
}
/**
- * Initialize stage for each worker to {@link ControllerWorkerStagePhase#NEW}
- *
- * @param workerCount
+ * Initialize stage for each worker to {@link
ControllerWorkerStagePhase#NEW}.
*/
- private void initializeWorkerState(int workerCount)
+ private void initializeWorkerState(IntSet workers)
{
- IntStream.range(0, workerCount)
- .forEach(wokerNumber -> workerToPhase.put(wokerNumber,
ControllerWorkerStagePhase.NEW));
+ for (int workerNumber : workers) {
+ workerToPhase.put(workerNumber, ControllerWorkerStagePhase.NEW);
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]