suneet-s commented on code in PR #14782:
URL: https://github.com/apache/druid/pull/14782#discussion_r1289285105


##########
server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyUtils.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.rpc.HttpResponseException;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * utilty methods that are useful for coordinator duties
+ */
+public class CoordinatorDutyUtils
+{
+
+  private static final Logger LOG = new Logger(CoordinatorDutyUtils.class);
+
+  /**
+   * Returns the total worker capacity in the cluster, including autoscaling, 
if enabled.
+   *
+   * @param overlordClient The overlord client used to get worker capacity 
info.
+   *
+   * @return the total worker capacity in the cluster, including autoscaling, 
if enabled.
+   */
+  public static int getTotalWorkerCapacity(@Nonnull final OverlordClient 
overlordClient)
+  {
+    int totalWorkerCapacity;
+    try {
+      final IndexingTotalWorkerCapacityInfo workerCapacityInfo =
+          FutureUtils.get(overlordClient.getTotalWorkerCapacity(), true);
+      totalWorkerCapacity = 
workerCapacityInfo.getMaximumCapacityWithAutoScale();
+      if (totalWorkerCapacity < 0) {
+        totalWorkerCapacity = workerCapacityInfo.getCurrentClusterCapacity();
+      }
+    }
+    catch (ExecutionException e) {
+      // Call to getTotalWorkerCapacity may fail during a rolling upgrade: API 
was added in 0.23.0.

Review Comment:
   Thank you for the specificity in this comment!



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyUtils.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.rpc.HttpResponseException;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * utilty methods that are useful for coordinator duties
+ */
+public class CoordinatorDutyUtils
+{
+
+  private static final Logger LOG = new Logger(CoordinatorDutyUtils.class);
+
+  /**
+   * Returns the total worker capacity in the cluster, including autoscaling, 
if enabled.
+   *
+   * @param overlordClient The overlord client used to get worker capacity 
info.
+   *
+   * @return the total worker capacity in the cluster, including autoscaling, 
if enabled.
+   */
+  public static int getTotalWorkerCapacity(@Nonnull final OverlordClient 
overlordClient)
+  {
+    int totalWorkerCapacity;
+    try {
+      final IndexingTotalWorkerCapacityInfo workerCapacityInfo =
+          FutureUtils.get(overlordClient.getTotalWorkerCapacity(), true);
+      totalWorkerCapacity = 
workerCapacityInfo.getMaximumCapacityWithAutoScale();
+      if (totalWorkerCapacity < 0) {
+        totalWorkerCapacity = workerCapacityInfo.getCurrentClusterCapacity();
+      }
+    }
+    catch (ExecutionException e) {
+      // Call to getTotalWorkerCapacity may fail during a rolling upgrade: API 
was added in 0.23.0.
+      if (e.getCause() instanceof HttpResponseException
+          && ((HttpResponseException) 
e.getCause()).getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
+        LOG.noStackTrace().warn(e, "Call to getTotalWorkerCapacity failed. 
Falling back to getWorkers.");
+        totalWorkerCapacity =
+            FutureUtils.getUnchecked(overlordClient.getWorkers(), true)
+                .stream()
+                .mapToInt(worker -> worker.getWorker().getCapacity())
+                .sum();
+      } else {
+        throw new RuntimeException(e.getCause());
+      }
+    }
+    catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+
+    return totalWorkerCapacity;
+  }
+
+  /**
+   * Return the number of active tasks that match the task predicate provided. 
The number of active tasks returned
+   * may be an overestimate, as tasks that return status's with null types 
will be conservatively counted to match the
+   * predicate provided.
+   *
+   * @param overlordClient The overlord client to use to retrieve the list of 
active tasks.
+   * @param taskPredicate  The predicate to match against the list of 
retreived task status.
+   *
+   * @return the number of active tasks that match the task predicate provided
+   */
+  public static List<TaskStatusPlus> getNumActiveTaskSlots(
+      @Nonnull final OverlordClient overlordClient,
+      final Predicate<TaskStatusPlus> taskPredicate
+  )
+  {
+    final CloseableIterator<TaskStatusPlus> activeTasks =
+        FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 0), 
true);
+    // Fetch currently running kill tasks

Review Comment:
   ```suggestion
       // Fetch currently running tasks that match the predicate
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -112,69 +112,112 @@ public KillUnusedSegments(
   @Override
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
   {
+
+    TaskStats taskStats = TaskStats.EMPTY;
     Collection<String> dataSourcesToKill =
         
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
     double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
     int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
-    int availableKillTaskSlots = getAvailableKillTaskSlots(killTaskSlotRatio, 
maxKillTaskSlots);
-    if (0 == availableKillTaskSlots) {
-      log.debug("Not killing any unused segments because there are no 
available kill task slots at this time.");
-      return params;
-    }
+    int killTaskCapacity = getKillTaskCapacity(
+        CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient),
+        killTaskSlotRatio,
+        maxKillTaskSlots
+    );
+    int availableKillTaskSlots = getAvailableKillTaskSlots(
+        killTaskCapacity,
+        CoordinatorDutyUtils.getNumActiveTaskSlots(overlordClient, 
TASK_PREDICATE).size()
+    );
+    final CoordinatorRunStats stats = params.getCoordinatorStats();
 
-    // If no datasource has been specified, all are eligible for killing 
unused segments
-    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-      dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames();
-    }
+    taskStats.availableTaskSlots = availableKillTaskSlots;
+    taskStats.maxSlots = killTaskCapacity;
 
-    final long currentTimeMillis = System.currentTimeMillis();
-    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-      log.debug("No eligible datasource to kill unused segments.");
-    } else if (lastKillTime + period > currentTimeMillis) {
-      log.debug("Skipping kill of unused segments as kill period has not 
elapsed yet.");
-    } else {
-      log.debug("Killing unused segments in datasources: %s", 
dataSourcesToKill);
-      lastKillTime = currentTimeMillis;
-      killUnusedSegments(dataSourcesToKill, availableKillTaskSlots);
+    if (0 < availableKillTaskSlots) {
+
+      // If no datasource has been specified, all are eligible for killing 
unused segments
+      if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+        dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
+      }
+
+      final long currentTimeMillis = System.currentTimeMillis();
+      if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+        log.debug("No eligible datasource to kill unused segments.");
+      } else if (lastKillTime + period > currentTimeMillis) {
+        log.debug("Skipping kill of unused segments as kill period has not 
elapsed yet.");
+      } else {
+        log.debug("Killing unused segments in datasources: %s", 
dataSourcesToKill);
+        lastKillTime = currentTimeMillis;
+        taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, 
availableKillTaskSlots);
+      }
     }
 
+    addStats(taskStats, stats);
     return params;
   }
 
-  private void killUnusedSegments(Collection<String> dataSourcesToKill, int 
availableKillTaskSlots)
+  private void addStats(
+      TaskStats taskStats,
+      CoordinatorRunStats stats
+  )
+  {
+    stats.add(Stats.Kill.AVAILABLE_SLOTS, taskStats.availableTaskSlots);
+    stats.add(Stats.Kill.SUBMITTED_TASKS, taskStats.submittedTasks);
+    stats.add(Stats.Kill.MAX_SLOTS, taskStats.maxSlots);
+  }
+
+  private int killUnusedSegments(
+      Collection<String> dataSourcesToKill,
+      int availableKillTaskSlots
+  )
   {
     int submittedTasks = 0;
-    for (String dataSource : dataSourcesToKill) {
-      if (submittedTasks >= availableKillTaskSlots) {
-        log.info(StringUtils.format(
-            "Submitted [%d] kill tasks and reached kill task slot limit [%d]. 
Will resume "
-            + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
-        break;
-      }
-      final Interval intervalToKill = findIntervalForKill(dataSource);
-      if (intervalToKill == null) {
-        continue;
-      }
-
-      try {
-        FutureUtils.getUnchecked(overlordClient.runKillTask(
-            TASK_ID_PREFIX,
-            dataSource,
-            intervalToKill,
-            maxSegmentsToKill
-        ), true);
-        ++submittedTasks;
-      }
-      catch (Exception ex) {
-        log.error(ex, "Failed to submit kill task for dataSource [%s]", 
dataSource);
-        if (Thread.currentThread().isInterrupted()) {
-          log.warn("skipping kill task scheduling because thread is 
interrupted.");
+    if (0 < availableKillTaskSlots && 
!CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+      log.info("datasourcesToKill: %s", dataSourcesToKill);

Review Comment:
   ```suggestion
   ```
   
   This was debug logged earlier on line 148 - maybe we don't need this?



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -61,6 +58,9 @@ public class KillUnusedSegments implements CoordinatorDuty
 {
   public static final String KILL_TASK_TYPE = "kill";
   public static final String TASK_ID_PREFIX = "coordinator-issued";
+  public static final Predicate<TaskStatusPlus> TASK_PREDICATE =

Review Comment:
   If we choose to have a more specific name, I think it should say 
`IS_AUTO_KILL_TASK` instead of `IS_KILL_TASK` as the predicate excludes manual 
kill tasks



##########
server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java:
##########
@@ -141,6 +141,12 @@ public static class Kill
         = CoordinatorStat.toDebugAndEmit("killedAuditLogs", 
"metadata/kill/audit/count");
     public static final CoordinatorStat DATASOURCES
         = CoordinatorStat.toDebugAndEmit("killedDatasources", 
"metadata/kill/datasource/count");
+    public static final CoordinatorStat AVAILABLE_SLOTS
+        = CoordinatorStat.toDebugAndEmit("killAvlSlots", 
"killTask/availableSlot/count");

Review Comment:
   ```suggestion
           = CoordinatorStat.toDebugAndEmit("killAvailSlots", 
"killTask/availableSlot/count");
   ```
   
   nit: I did a hot take when I read Avl and wasn't sure what that stood for. 
Maybe Avail is clear and still a useful short name?



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -112,69 +112,112 @@ public KillUnusedSegments(
   @Override
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
   {
+
+    TaskStats taskStats = TaskStats.EMPTY;

Review Comment:
   hmm I think this isn't quite right. It looks like you're re-using the same 
EMPTY object across coordinator runs. So if in one run submittedTasks is set to 
10, and the next run there was no capacity, submittedTasks would still be set 
to 10.
   
   I think it would be easier to follow if you just removed the constant 
`EMPTY` and created a new object across each run. 
   
   You could also consider whether there is enough benefit in using an object 
here since it's fields are being set in 2 different places - we could just call 
`stats.addStat(...)` on lines 132, 133 and 150 (and in the else block for `if 
(0 < availableKillTaskSlots)`)



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -199,86 +242,33 @@ private Interval findIntervalForKill(String dataSource)
     }
   }
 
-  private int getAvailableKillTaskSlots(double killTaskSlotRatio, int 
maxKillTaskSlots)
+  private int getAvailableKillTaskSlots(int killTaskCapacity, int 
numActiveKillTasks)
   {
     return Math.max(
         0,
-        getKillTaskCapacity(getTotalWorkerCapacity(), killTaskSlotRatio, 
maxKillTaskSlots) - getNumActiveKillTaskSlots()
+        killTaskCapacity - numActiveKillTasks
     );
   }
 
-  /**
-   * Get the number of active kill task slots in use. The kill tasks counted, 
are only those thare are submitted
-   * by this coordinator duty (have prefix {@link 
KillUnusedSegments#TASK_ID_PREFIX}. The value returned here
-   * may be an overestimate, as in some cased the taskType can be null if 
middleManagers are running with an older
-   * version, and these tasks are counted as active kill tasks to be safe.
-   * @return
-   */
-  private int getNumActiveKillTaskSlots()
+  @VisibleForTesting
+  static int getKillTaskCapacity(int totalWorkerCapacity, double 
killTaskSlotRatio, int maxKillTaskSlots)
   {
-    final CloseableIterator<TaskStatusPlus> activeTasks =
-        FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 0), 
true);
-    // Fetch currently running kill tasks
-    int numActiveKillTasks = 0;
-
-    try (final Closer closer = Closer.create()) {
-      closer.register(activeTasks);
-      while (activeTasks.hasNext()) {
-        final TaskStatusPlus status = activeTasks.next();
-
-        // taskType can be null if middleManagers are running with an older 
version. Here, we consevatively regard
-        // the tasks of the unknown taskType as the killTask. This is because 
it's important to not run
-        // killTasks more than the configured limit at any time which might 
impact to the ingestion
-        // performance.
-        if (status.getType() == null
-            || (KILL_TASK_TYPE.equals(status.getType()) && 
status.getId().startsWith(TASK_ID_PREFIX))) {
-          numActiveKillTasks++;
-        }
-      }
-    }
-    catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
-    return numActiveKillTasks;
+    return Math.min((int) (totalWorkerCapacity * Math.min(killTaskSlotRatio, 
1.0)), maxKillTaskSlots);
   }
 
-  private int getTotalWorkerCapacity()
+  static class TaskStats
   {
-    int totalWorkerCapacity;
-    try {
-      final IndexingTotalWorkerCapacityInfo workerCapacityInfo =
-          FutureUtils.get(overlordClient.getTotalWorkerCapacity(), true);
-      totalWorkerCapacity = 
workerCapacityInfo.getMaximumCapacityWithAutoScale();
-      if (totalWorkerCapacity < 0) {
-        totalWorkerCapacity = workerCapacityInfo.getCurrentClusterCapacity();
-      }
-    }
-    catch (ExecutionException e) {
-      // Call to getTotalWorkerCapacity may fail during a rolling upgrade: API 
was added in 0.23.0.
-      if (e.getCause() instanceof HttpResponseException
-          && ((HttpResponseException) 
e.getCause()).getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
-        log.noStackTrace().warn(e, "Call to getTotalWorkerCapacity failed. 
Falling back to getWorkers.");
-        totalWorkerCapacity =
-            FutureUtils.getUnchecked(overlordClient.getWorkers(), true)
-                .stream()
-                .mapToInt(worker -> worker.getWorker().getCapacity())
-                .sum();
-      } else {
-        throw new RuntimeException(e.getCause());
-      }
-    }
-    catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    }
 
-    return totalWorkerCapacity;
-  }
+    static final TaskStats EMPTY = new TaskStats();
+    int availableTaskSlots;
+    int maxSlots;
+    int submittedTasks;
 
-  @VisibleForTesting
-  static int getKillTaskCapacity(int totalWorkerCapacity, double 
killTaskSlotRatio, int maxKillTaskSlots)
-  {
-    return Math.min((int) (totalWorkerCapacity * Math.min(killTaskSlotRatio, 
1.0)), maxKillTaskSlots);
+    TaskStats()
+    {
+      availableTaskSlots = 0;
+      maxSlots = 0;
+      submittedTasks = 0;
+    }

Review Comment:
   See comment above about considering to get rid of this class



##########
server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java:
##########
@@ -141,6 +141,12 @@ public static class Kill
         = CoordinatorStat.toDebugAndEmit("killedAuditLogs", 
"metadata/kill/audit/count");
     public static final CoordinatorStat DATASOURCES
         = CoordinatorStat.toDebugAndEmit("killedDatasources", 
"metadata/kill/datasource/count");
+    public static final CoordinatorStat AVAILABLE_SLOTS
+        = CoordinatorStat.toDebugAndEmit("killAvlSlots", 
"killTask/availableSlot/count");
+    public static final CoordinatorStat MAX_SLOTS
+        = CoordinatorStat.toDebugAndEmit("killtMaxSlots", 
"killTask/maxSlot/count");

Review Comment:
   ```suggestion
           = CoordinatorStat.toDebugAndEmit("killMaxSlots", 
"killTask/maxSlot/count");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to