suneet-s commented on code in PR #14769:
URL: https://github.com/apache/druid/pull/14769#discussion_r1286266136


##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -158,6 +164,20 @@ public CoordinatorDynamicConfig(
     this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);
     this.specificDataSourcesToKillUnusedSegmentsIn
         = parseJsonStringOrArray(specificDataSourcesToKillUnusedSegmentsIn);
+    if (null != killTaskSlotRatio && (killTaskSlotRatio < 0 || 
killTaskSlotRatio > 1)) {
+      throw InvalidInput.exception(
+          "killTaskSlotRatio [%.2f] is invalid. It must be >= 0 and <= 1.",
+          killTaskSlotRatio
+      );
+    }
+    this.killTaskSlotRatio = killTaskSlotRatio != null ? killTaskSlotRatio : 
Defaults.KILL_TASK_SLOT_RATIO;
+    if (null != maxKillTaskSlots && maxKillTaskSlots < 0) {
+      throw InvalidInput.exception(
+          "maxKillTaskSlots [%d] is invalid. It must be > 0.",

Review Comment:
   ```suggestion
             "maxKillTaskSlots [%d] is invalid. It must be >= 0.",
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -174,4 +198,70 @@ private Interval findIntervalForKill(String dataSource)
     }
   }
 
+  private int getAvailableKillTaskSlots(double killTaskSlotRatio, int 
maxKillTaskSlots)
+  {
+    return Math.max(0, getKillTaskCapacity(killTaskSlotRatio, 
maxKillTaskSlots) - getNumActiveKillTaskSlots());
+  }
+
+  private int getNumActiveKillTaskSlots()
+  {
+    final CloseableIterator<TaskStatusPlus> activeTasks =
+        FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 0), 
true);
+    // Fetch currently running kill tasks
+    int numActiveKillTasks = 0;
+
+    try (final Closer closer = Closer.create()) {
+      closer.register(activeTasks);
+      while (activeTasks.hasNext()) {
+        final TaskStatusPlus status = activeTasks.next();
+
+        // taskType can be null if middleManagers are running with an older 
version. Here, we consevatively regard

Review Comment:
   > taskType can be null if middleManagers are running with an older version.
   
   Do we know in which versio the middle managers would report the task type as 
null? I saw the same comment in the compact segments duty that was from mid 
2018. So I wonder if this null check is that important



##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -495,6 +533,8 @@ private static class Defaults
     static final int MAX_NON_PRIMARY_REPLICANTS_TO_LOAD = Integer.MAX_VALUE;
     static final boolean USE_ROUND_ROBIN_ASSIGNMENT = true;
     static final boolean SMART_SEGMENT_LOADING = true;
+    static final double KILL_TASK_SLOT_RATIO = 1.0;
+    static final int MAX_KILL_TASK_SLOTS = Integer.MAX_VALUE;

Review Comment:
   A comment here that these defaults are to preserve the behavior before Druid 
0.28 and a future version may want to consider better defaults so that kill 
tasks can not eat up all the capacity in the cluster



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -174,4 +199,79 @@ private Interval findIntervalForKill(String dataSource)
     }
   }
 
+  private int getAvailableKillTaskSlots(double killTaskSlotRatio, int 
maxKillTaskSlots)
+  {
+    return Math.max(
+        0,
+        getKillTaskCapacity(getTotalWorkerCapacity(), killTaskSlotRatio, 
maxKillTaskSlots) - getNumActiveKillTaskSlots()
+    );
+  }
+
+  private int getNumActiveKillTaskSlots()
+  {
+    final CloseableIterator<TaskStatusPlus> activeTasks =
+        FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 0), 
true);
+    // Fetch currently running kill tasks
+    int numActiveKillTasks = 0;
+
+    try (final Closer closer = Closer.create()) {
+      closer.register(activeTasks);
+      while (activeTasks.hasNext()) {
+        final TaskStatusPlus status = activeTasks.next();
+
+        // taskType can be null if middleManagers are running with an older 
version. Here, we consevatively regard
+        // the tasks of the unknown taskType as the killTask. This is because 
it's important to not run
+        // killTasks more than the configured limit at any time which might 
impact to the ingestion
+        // performance.
+        if (status.getType() == null
+            || (KILL_TASK_TYPE.equals(status.getType()) && 
status.getId().startsWith(TASK_ID_PREFIX))) {
+          numActiveKillTasks++;
+        }
+      }
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    return numActiveKillTasks;
+  }
+
+  private int getTotalWorkerCapacity()

Review Comment:
   It seems like a lot of this code is very similar to the code in 
`CompactSegments` Can we refactor the code so that the code can be re-used in 
both duties?



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -102,6 +114,13 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
   {
     Collection<String> dataSourcesToKill =
         
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
+    double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
+    int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
+    int availableKillTaskSlots = getAvailableKillTaskSlots(killTaskSlotRatio, 
maxKillTaskSlots);
+    if (0 == availableKillTaskSlots) {
+      log.warn("Not killing any unused segments because there are no available 
kill task slots at this time.");

Review Comment:
   Does this need to be a warning? It seems like it could be quite verbose if 
there are long running kill tasks and the KillUnusedSegments duty is scheduled 
to run frequently. 
   
   Perhaps, a better way of surfacing this information is emitting the number 
of kill tasks and calculated kill capacity as metrics so that operators can 
look at these metrics to decide if they need more kill capacity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to