zachjsh commented on code in PR #14782:
URL: https://github.com/apache/druid/pull/14782#discussion_r1289532913


##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -112,69 +112,112 @@ public KillUnusedSegments(
   @Override
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
   {
+
+    TaskStats taskStats = TaskStats.EMPTY;

Review Comment:
   Good catch! Decided to keep class, if thats ok with you.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -199,86 +242,33 @@ private Interval findIntervalForKill(String dataSource)
     }
   }
 
-  private int getAvailableKillTaskSlots(double killTaskSlotRatio, int 
maxKillTaskSlots)
+  private int getAvailableKillTaskSlots(int killTaskCapacity, int 
numActiveKillTasks)
   {
     return Math.max(
         0,
-        getKillTaskCapacity(getTotalWorkerCapacity(), killTaskSlotRatio, 
maxKillTaskSlots) - getNumActiveKillTaskSlots()
+        killTaskCapacity - numActiveKillTasks
     );
   }
 
-  /**
-   * Get the number of active kill task slots in use. The kill tasks counted, 
are only those thare are submitted
-   * by this coordinator duty (have prefix {@link 
KillUnusedSegments#TASK_ID_PREFIX}. The value returned here
-   * may be an overestimate, as in some cased the taskType can be null if 
middleManagers are running with an older
-   * version, and these tasks are counted as active kill tasks to be safe.
-   * @return
-   */
-  private int getNumActiveKillTaskSlots()
+  @VisibleForTesting
+  static int getKillTaskCapacity(int totalWorkerCapacity, double 
killTaskSlotRatio, int maxKillTaskSlots)
   {
-    final CloseableIterator<TaskStatusPlus> activeTasks =
-        FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 0), 
true);
-    // Fetch currently running kill tasks
-    int numActiveKillTasks = 0;
-
-    try (final Closer closer = Closer.create()) {
-      closer.register(activeTasks);
-      while (activeTasks.hasNext()) {
-        final TaskStatusPlus status = activeTasks.next();
-
-        // taskType can be null if middleManagers are running with an older 
version. Here, we consevatively regard
-        // the tasks of the unknown taskType as the killTask. This is because 
it's important to not run
-        // killTasks more than the configured limit at any time which might 
impact to the ingestion
-        // performance.
-        if (status.getType() == null
-            || (KILL_TASK_TYPE.equals(status.getType()) && 
status.getId().startsWith(TASK_ID_PREFIX))) {
-          numActiveKillTasks++;
-        }
-      }
-    }
-    catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
-    return numActiveKillTasks;
+    return Math.min((int) (totalWorkerCapacity * Math.min(killTaskSlotRatio, 
1.0)), maxKillTaskSlots);
   }
 
-  private int getTotalWorkerCapacity()
+  static class TaskStats
   {
-    int totalWorkerCapacity;
-    try {
-      final IndexingTotalWorkerCapacityInfo workerCapacityInfo =
-          FutureUtils.get(overlordClient.getTotalWorkerCapacity(), true);
-      totalWorkerCapacity = 
workerCapacityInfo.getMaximumCapacityWithAutoScale();
-      if (totalWorkerCapacity < 0) {
-        totalWorkerCapacity = workerCapacityInfo.getCurrentClusterCapacity();
-      }
-    }
-    catch (ExecutionException e) {
-      // Call to getTotalWorkerCapacity may fail during a rolling upgrade: API 
was added in 0.23.0.
-      if (e.getCause() instanceof HttpResponseException
-          && ((HttpResponseException) 
e.getCause()).getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
-        log.noStackTrace().warn(e, "Call to getTotalWorkerCapacity failed. 
Falling back to getWorkers.");
-        totalWorkerCapacity =
-            FutureUtils.getUnchecked(overlordClient.getWorkers(), true)
-                .stream()
-                .mapToInt(worker -> worker.getWorker().getCapacity())
-                .sum();
-      } else {
-        throw new RuntimeException(e.getCause());
-      }
-    }
-    catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    }
 
-    return totalWorkerCapacity;
-  }
+    static final TaskStats EMPTY = new TaskStats();
+    int availableTaskSlots;
+    int maxSlots;
+    int submittedTasks;
 
-  @VisibleForTesting
-  static int getKillTaskCapacity(int totalWorkerCapacity, double 
killTaskSlotRatio, int maxKillTaskSlots)
-  {
-    return Math.min((int) (totalWorkerCapacity * Math.min(killTaskSlotRatio, 
1.0)), maxKillTaskSlots);
+    TaskStats()
+    {
+      availableTaskSlots = 0;
+      maxSlots = 0;
+      submittedTasks = 0;
+    }

Review Comment:
   Decided to keep class, if ok with you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to