This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch 31.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/31.0.0 by this push:
     new 1435b9f4bdc Dart: Skip final getCounters, postFinish to idle 
historicals. (#17255) (#17259)
1435b9f4bdc is described below

commit 1435b9f4bdcfee19e7b4268ae33ec53462da8e17
Author: Kashif Faraz <[email protected]>
AuthorDate: Sat Oct 5 19:32:21 2024 +0530

    Dart: Skip final getCounters, postFinish to idle historicals. (#17255) 
(#17259)
    
    In a Dart query, all Historicals are given worker IDs, but not all of them
    are going to actually be started or receive work orders.
    
    Attempting to send a getCounters or postFinish command to a worker that
    never received a work order is not only wasteful, but it causes errors due
    to the workers not knowing about that query ID.
    
    Co-authored-by: Gian Merlino <[email protected]>
---
 .../org/apache/druid/msq/exec/ControllerImpl.java    | 20 +++++++++++---------
 .../msq/kernel/controller/ControllerQueryKernel.java | 15 +++++++++++++++
 .../kernel/controller/ControllerStageTracker.java    | 14 ++++++--------
 3 files changed, 32 insertions(+), 17 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 60e0910e15b..936ba4af44d 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -432,8 +432,10 @@ public class ControllerImpl implements Controller
       }
     }
     if (queryKernel != null && queryKernel.isSuccess()) {
-      // If successful, encourage the tasks to exit successfully.
-      postFinishToAllTasks();
+      // If successful, encourage workers to exit successfully.
+      // Only send this command to participating workers. For task-based 
queries, this is all tasks, since tasks
+      // are launched only when needed. For Dart, this is any servers that 
were actually assigned work items.
+      postFinishToWorkers(queryKernel.getAllParticipatingWorkers());
       workerManager.stop(false);
     } else {
       // If not successful, cancel running tasks.
@@ -1456,15 +1458,15 @@ public class ControllerImpl implements Controller
     return IntervalUtils.difference(replaceIntervals, publishIntervals);
   }
 
-  private CounterSnapshotsTree getCountersFromAllTasks()
+  private CounterSnapshotsTree fetchCountersFromWorkers(final IntSet workers)
   {
     final CounterSnapshotsTree retVal = new CounterSnapshotsTree();
     final List<String> taskList = getWorkerIds();
 
     final List<ListenableFuture<CounterSnapshotsTree>> futures = new 
ArrayList<>();
 
-    for (String taskId : taskList) {
-      futures.add(netClient.getCounters(taskId));
+    for (int workerNumber : workers) {
+      futures.add(netClient.getCounters(taskList.get(workerNumber)));
     }
 
     final List<CounterSnapshotsTree> snapshotsTrees =
@@ -1477,14 +1479,14 @@ public class ControllerImpl implements Controller
     return retVal;
   }
 
-  private void postFinishToAllTasks()
+  private void postFinishToWorkers(final IntSet workers)
   {
     final List<String> taskList = getWorkerIds();
 
     final List<ListenableFuture<Void>> futures = new ArrayList<>();
 
-    for (String taskId : taskList) {
-      futures.add(netClient.postFinish(taskId));
+    for (int workerNumber : workers) {
+      futures.add(netClient.postFinish(taskList.get(workerNumber)));
     }
 
     FutureUtils.getUnchecked(MSQFutureUtils.allAsList(futures, true), true);
@@ -1499,7 +1501,7 @@ public class ControllerImpl implements Controller
   private CounterSnapshotsTree getFinalCountersSnapshot(@Nullable final 
ControllerQueryKernel queryKernel)
   {
     if (queryKernel != null && queryKernel.isSuccess()) {
-      return getCountersFromAllTasks();
+      return 
fetchCountersFromWorkers(queryKernel.getAllParticipatingWorkers());
     } else {
       return makeCountersSnapshotForLiveReports();
     }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
index 16ed68211d5..b0200234b40 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
@@ -27,6 +27,7 @@ import it.unimi.dsi.fastutil.ints.Int2IntAVLTreeMap;
 import it.unimi.dsi.fastutil.ints.Int2IntMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import org.apache.druid.frame.key.ClusterByPartitions;
 import org.apache.druid.java.util.common.IAE;
@@ -643,6 +644,20 @@ public class ControllerQueryKernel
     doWithStageTracker(stageId, ControllerStageTracker::fail);
   }
 
+  /**
+   * Returns the set of all worker numbers that have participated in work done 
so far by this query.
+   */
+  public IntSet getAllParticipatingWorkers()
+  {
+    final IntSet retVal = new IntAVLTreeSet();
+
+    for (final ControllerStageTracker tracker : stageTrackers.values()) {
+      retVal.addAll(tracker.getWorkerInputs().workers());
+    }
+
+    return retVal;
+  }
+
   /**
    * Fetches and returns the stage kernel corresponding to the provided stage 
id, else throws {@link IAE}
    */
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
index 0a62ba24b63..338a35e0d24 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
@@ -63,7 +63,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.stream.IntStream;
 
 /**
  * Controller-side state machine for each stage. Used by {@link 
ControllerQueryKernel} to form the overall state
@@ -137,7 +136,7 @@ class ControllerStageTracker
     this.workerInputs = workerInputs;
     this.maxRetainedPartitionSketchBytes = maxRetainedPartitionSketchBytes;
 
-    initializeWorkerState(workerCount);
+    initializeWorkerState(workerInputs.workers());
 
     if (stageDef.mustGatherResultKeyStatistics()) {
       this.completeKeyStatisticsInformation =
@@ -149,14 +148,13 @@ class ControllerStageTracker
   }
 
   /**
-   * Initialize stage for each worker to {@link ControllerWorkerStagePhase#NEW}
-   *
-   * @param workerCount
+   * Initialize stage for each worker to {@link 
ControllerWorkerStagePhase#NEW}.
    */
-  private void initializeWorkerState(int workerCount)
+  private void initializeWorkerState(IntSet workers)
   {
-    IntStream.range(0, workerCount)
-             .forEach(wokerNumber -> workerToPhase.put(wokerNumber, 
ControllerWorkerStagePhase.NEW));
+    for (int workerNumber : workers) {
+      workerToPhase.put(workerNumber, ControllerWorkerStagePhase.NEW);
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to