jasonk000 commented on code in PR #14782:
URL: https://github.com/apache/druid/pull/14782#discussion_r1289015971


##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -61,6 +58,9 @@ public class KillUnusedSegments implements CoordinatorDuty
 {
   public static final String KILL_TASK_TYPE = "kill";
   public static final String TASK_ID_PREFIX = "coordinator-issued";
+  public static final Predicate<TaskStatusPlus> TASK_PREDICATE =

Review Comment:
   Similar here, I think we should give the name a more clear name, 
`IS_KILL_TASK`?



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -112,69 +112,112 @@ public KillUnusedSegments(
   @Override
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
   {
+
+    TaskStats taskStats = TaskStats.EMPTY;
     Collection<String> dataSourcesToKill =
         
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
     double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
     int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
-    int availableKillTaskSlots = getAvailableKillTaskSlots(killTaskSlotRatio, 
maxKillTaskSlots);
-    if (0 == availableKillTaskSlots) {
-      log.debug("Not killing any unused segments because there are no 
available kill task slots at this time.");
-      return params;
-    }
+    int killTaskCapacity = getKillTaskCapacity(
+        CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient),
+        killTaskSlotRatio,
+        maxKillTaskSlots
+    );
+    int availableKillTaskSlots = getAvailableKillTaskSlots(
+        killTaskCapacity,
+        CoordinatorDutyUtils.getNumActiveTaskSlots(overlordClient, 
TASK_PREDICATE).size()
+    );
+    final CoordinatorRunStats stats = params.getCoordinatorStats();
 
-    // If no datasource has been specified, all are eligible for killing 
unused segments
-    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-      dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames();
-    }
+    taskStats.availableTaskSlots = availableKillTaskSlots;
+    taskStats.maxSlots = killTaskCapacity;
 
-    final long currentTimeMillis = System.currentTimeMillis();
-    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-      log.debug("No eligible datasource to kill unused segments.");
-    } else if (lastKillTime + period > currentTimeMillis) {
-      log.debug("Skipping kill of unused segments as kill period has not 
elapsed yet.");
-    } else {
-      log.debug("Killing unused segments in datasources: %s", 
dataSourcesToKill);
-      lastKillTime = currentTimeMillis;
-      killUnusedSegments(dataSourcesToKill, availableKillTaskSlots);
+    if (0 < availableKillTaskSlots) {
+
+      // If no datasource has been specified, all are eligible for killing 
unused segments
+      if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+        dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
+      }
+
+      final long currentTimeMillis = System.currentTimeMillis();
+      if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+        log.debug("No eligible datasource to kill unused segments.");
+      } else if (lastKillTime + period > currentTimeMillis) {

Review Comment:
   Should this check be pulled out earlier in the function; ie: we can avoid 
calculation altogether if the kill time was too recent. Not sure if I missed 
something.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyUtils.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.rpc.HttpResponseException;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * utilty methods that are useful for coordinator duties
+ */
+public class CoordinatorDutyUtils
+{
+
+  private static final Logger LOG = new Logger(CoordinatorDutyUtils.class);
+
+  /**
+   * Returns the total worker capacity in the cluster, including autoscaling, 
if enabled.
+   *
+   * @param overlordClient The overlord client used to get worker capacity 
info.
+   *
+   * @return the total worker capacity in the cluster, including autoscaling, 
if enabled.
+   */
+  public static int getTotalWorkerCapacity(@Nonnull final OverlordClient 
overlordClient)
+  {
+    int totalWorkerCapacity;
+    try {
+      final IndexingTotalWorkerCapacityInfo workerCapacityInfo =
+          FutureUtils.get(overlordClient.getTotalWorkerCapacity(), true);
+      totalWorkerCapacity = 
workerCapacityInfo.getMaximumCapacityWithAutoScale();
+      if (totalWorkerCapacity < 0) {
+        totalWorkerCapacity = workerCapacityInfo.getCurrentClusterCapacity();
+      }
+    }
+    catch (ExecutionException e) {
+      // Call to getTotalWorkerCapacity may fail during a rolling upgrade: API 
was added in 0.23.0.
+      if (e.getCause() instanceof HttpResponseException
+          && ((HttpResponseException) 
e.getCause()).getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
+        LOG.noStackTrace().warn(e, "Call to getTotalWorkerCapacity failed. 
Falling back to getWorkers.");
+        totalWorkerCapacity =
+            FutureUtils.getUnchecked(overlordClient.getWorkers(), true)
+                .stream()
+                .mapToInt(worker -> worker.getWorker().getCapacity())
+                .sum();
+      } else {
+        throw new RuntimeException(e.getCause());
+      }
+    }
+    catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+
+    return totalWorkerCapacity;
+  }
+
+  /**
+   * Return the number of active tasks that match the task predicate provided. 
The number of active tasks returned
+   * may be an overestimate, as tasks that return status's with null types 
will be conservatively counted to match the
+   * predicate provided.
+   *
+   * @param overlordClient The overlord client to use to retrieve the list of 
active tasks.
+   * @param taskPredicate  The predicate to match against the list of 
retreived task status.
+   *
+   * @return the number of active tasks that match the task predicate provided
+   */
+  public static List<TaskStatusPlus> getNumActiveTaskSlots(
+      @Nonnull final OverlordClient overlordClient,
+      final Predicate<TaskStatusPlus> taskPredicate
+  )
+  {
+    final CloseableIterator<TaskStatusPlus> activeTasks =
+        FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 0), 
true);
+    // Fetch currently running kill tasks
+    ImmutableList.Builder<TaskStatusPlus> taskStatuses = 
ImmutableList.builder();
+
+    try (final Closer closer = Closer.create()) {
+      closer.register(activeTasks);
+      while (activeTasks.hasNext()) {
+        final TaskStatusPlus status = activeTasks.next();
+
+        // taskType can be null if middleManagers are running with an older 
version. Here, we consevatively regard
+        // the tasks of the unknown taskType as the killTask. This is because 
it's important to not run
+        // killTasks more than the configured limit at any time which might 
impact to the ingestion
+        // performance.
+        if (status.getType() == null || (taskPredicate.apply(status))) {

Review Comment:
   I observe something unclear in the `null` check here:
   - First, we assume `status` is non-null because we deref status
   - The, we call predicate; predicate first checks if `status` is null (when 
it can never be because of deref/NPE)
   - Then we do comparison.
   
   I think it would be good idea to clarify in Javadoc (and code) either one or 
the other situation
   - `Predicate<TaskStatusPlus>` predicate "must be able to accept null", OR, 
predicate "will never be called with null".



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java:
##########
@@ -87,6 +81,9 @@ public class CompactSegments implements CoordinatorCustomDuty
 
   private static final Logger LOG = new Logger(CompactSegments.class);
 
+  private static final Predicate<TaskStatusPlus> TASK_PREDICATE =

Review Comment:
   Can we name this `COMPACTION_TASK_PREDICATE`, or `IS_COMPACTION_TASK`?



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -112,69 +112,112 @@ public KillUnusedSegments(
   @Override
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
   {
+
+    TaskStats taskStats = TaskStats.EMPTY;
     Collection<String> dataSourcesToKill =
         
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
     double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
     int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
-    int availableKillTaskSlots = getAvailableKillTaskSlots(killTaskSlotRatio, 
maxKillTaskSlots);
-    if (0 == availableKillTaskSlots) {
-      log.debug("Not killing any unused segments because there are no 
available kill task slots at this time.");
-      return params;
-    }
+    int killTaskCapacity = getKillTaskCapacity(
+        CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient),
+        killTaskSlotRatio,
+        maxKillTaskSlots
+    );
+    int availableKillTaskSlots = getAvailableKillTaskSlots(
+        killTaskCapacity,
+        CoordinatorDutyUtils.getNumActiveTaskSlots(overlordClient, 
TASK_PREDICATE).size()
+    );
+    final CoordinatorRunStats stats = params.getCoordinatorStats();
 
-    // If no datasource has been specified, all are eligible for killing 
unused segments
-    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-      dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames();
-    }
+    taskStats.availableTaskSlots = availableKillTaskSlots;
+    taskStats.maxSlots = killTaskCapacity;
 
-    final long currentTimeMillis = System.currentTimeMillis();
-    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-      log.debug("No eligible datasource to kill unused segments.");
-    } else if (lastKillTime + period > currentTimeMillis) {
-      log.debug("Skipping kill of unused segments as kill period has not 
elapsed yet.");
-    } else {
-      log.debug("Killing unused segments in datasources: %s", 
dataSourcesToKill);
-      lastKillTime = currentTimeMillis;
-      killUnusedSegments(dataSourcesToKill, availableKillTaskSlots);
+    if (0 < availableKillTaskSlots) {
+
+      // If no datasource has been specified, all are eligible for killing 
unused segments
+      if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+        dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
+      }
+
+      final long currentTimeMillis = System.currentTimeMillis();
+      if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+        log.debug("No eligible datasource to kill unused segments.");
+      } else if (lastKillTime + period > currentTimeMillis) {
+        log.debug("Skipping kill of unused segments as kill period has not 
elapsed yet.");
+      } else {
+        log.debug("Killing unused segments in datasources: %s", 
dataSourcesToKill);
+        lastKillTime = currentTimeMillis;
+        taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, 
availableKillTaskSlots);
+      }
     }
 
+    addStats(taskStats, stats);
     return params;
   }
 
-  private void killUnusedSegments(Collection<String> dataSourcesToKill, int 
availableKillTaskSlots)
+  private void addStats(
+      TaskStats taskStats,
+      CoordinatorRunStats stats
+  )
+  {
+    stats.add(Stats.Kill.AVAILABLE_SLOTS, taskStats.availableTaskSlots);
+    stats.add(Stats.Kill.SUBMITTED_TASKS, taskStats.submittedTasks);
+    stats.add(Stats.Kill.MAX_SLOTS, taskStats.maxSlots);
+  }
+
+  private int killUnusedSegments(
+      Collection<String> dataSourcesToKill,
+      int availableKillTaskSlots
+  )
   {
     int submittedTasks = 0;
-    for (String dataSource : dataSourcesToKill) {
-      if (submittedTasks >= availableKillTaskSlots) {
-        log.info(StringUtils.format(
-            "Submitted [%d] kill tasks and reached kill task slot limit [%d]. 
Will resume "
-            + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
-        break;
-      }
-      final Interval intervalToKill = findIntervalForKill(dataSource);
-      if (intervalToKill == null) {
-        continue;
-      }
-
-      try {
-        FutureUtils.getUnchecked(overlordClient.runKillTask(
-            TASK_ID_PREFIX,
-            dataSource,
-            intervalToKill,
-            maxSegmentsToKill
-        ), true);
-        ++submittedTasks;
-      }
-      catch (Exception ex) {
-        log.error(ex, "Failed to submit kill task for dataSource [%s]", 
dataSource);
-        if (Thread.currentThread().isInterrupted()) {
-          log.warn("skipping kill task scheduling because thread is 
interrupted.");
+    if (0 < availableKillTaskSlots && 
!CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+      log.info("datasourcesToKill: %s", dataSourcesToKill);
+      for (String dataSource : dataSourcesToKill) {
+        if (submittedTasks >= availableKillTaskSlots) {
+          log.info(StringUtils.format(
+              "Submitted [%d] kill tasks and reached kill task slot limit 
[%d]. Will resume "

Review Comment:
   :+1: 



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java:
##########
@@ -152,9 +149,10 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
 
     // Fetch currently running compaction tasks
     int busyCompactionTaskSlots = 0;
-    final CloseableIterator<TaskStatusPlus> activeTasks =
-        FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 0), 
true);
-    final List<TaskStatusPlus> compactionTasks = 
filterNonCompactionTasks(activeTasks);
+    final List<TaskStatusPlus> compactionTasks = 
CoordinatorDutyUtils.getNumActiveTaskSlots(

Review Comment:
   :+1: much cleaner



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to