This is an automated email from the ASF dual-hosted git repository.

zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 82d82dfbd6 Add stats to KillUnusedSegments coordinator duty (#14782)
82d82dfbd6 is described below

commit 82d82dfbd6af5e6ed83d0c663c55dc538d6b1d9e
Author: zachjsh <[email protected]>
AuthorDate: Thu Aug 10 18:36:53 2023 -0400

    Add stats to KillUnusedSegments coordinator duty (#14782)
    
    ### Description
    
    Added the following metrics, which are calculated from the 
`KillUnusedSegments` coordinatorDuty
    
    `"killTask/availableSlot/count"`: calculates the number remaining task 
slots available for auto kill
    `"killTask/maxSlot/count"`: calculates the maximum number of tasks 
available for auto kill
    `"killTask/task/count"`: calculates the number of tasks submitted by auto 
kill.
    
    #### Release note
    NEW: metrics added for auto kill
    
    `"killTask/availableSlot/count"`: calculates the number remaining task 
slots available for auto kill
    `"killTask/maxSlot/count"`: calculates the maximum number of tasks 
available for auto kill
    `"killTask/task/count"`: calculates the number of tasks submitted by auto 
kill.
---
 docs/operations/metrics.md                         |   3 +
 .../main/resources/defaultMetricDimensions.json    |   6 +-
 .../server/coordinator/duty/CompactSegments.java   |  73 +------
 .../coordinator/duty/CoordinatorDutyUtils.java     | 129 ++++++++++++
 .../coordinator/duty/KillUnusedSegments.java       | 227 ++++++++++-----------
 .../druid/server/coordinator/stats/Stats.java      |   6 +
 .../coordinator/duty/KillUnusedSegmentsTest.java   |  25 +++
 7 files changed, 284 insertions(+), 185 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 6383a4057c..8b809721b9 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -320,6 +320,9 @@ These metrics are for the Druid Coordinator and are reset 
each time the Coordina
 |`compact/task/count`|Number of tasks issued in the auto compaction run.| 
|Varies|
 |`compactTask/maxSlot/count`|Maximum number of task slots available for auto 
compaction tasks in the auto compaction run.| |Varies|
 |`compactTask/availableSlot/count`|Number of available task slots that can be 
used for auto compaction tasks in the auto compaction run. This is the max 
number of task slots minus any currently running compaction tasks.| |Varies|
+|`killTask/availableSlot/count`| Number of available task slots that can be 
used for auto kill tasks in the auto kill run. This is the max number of task 
slots minus any currently running auto kill tasks.                              
                                                                                
                                                                                
                                                                                
                   [...]
+|`killTask/maxSlot/count`| Maximum number of task slots available for auto 
kill tasks in the auto kill run.                                                
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+|`kill/task/count`| Number of tasks issued in the auto kill run.               
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 |`segment/waitCompact/bytes`|Total bytes of this datasource waiting to be 
compacted by the auto compaction (only consider intervals/segments that are 
eligible for auto compaction).|`dataSource`|Varies|
 |`segment/waitCompact/count`|Total number of segments of this datasource 
waiting to be compacted by the auto compaction (only consider 
intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|
 |`interval/waitCompact/count`|Total number of intervals of this datasource 
waiting to be compacted by the auto compaction (only consider 
intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|
diff --git 
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
 
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
index 9b134cc542..b5c5f8b1ce 100644
--- 
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
+++ 
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
@@ -175,5 +175,9 @@
   "namespace/cache/numEntries" : { "dimensions" : [], "type" : "gauge" },
   "namespace/cache/heapSizeInBytes" : { "dimensions" : [], "type" : "gauge" },
 
-  "service/heartbeat" : { "dimensions" : ["leader"], "type" : "count" }
+  "service/heartbeat" : { "dimensions" : ["leader"], "type" : "count" },
+
+  "killTask/availableSlot/count" : { "dimensions" : [], "type" : "count" },
+  "killTask/maxSlot/count" : { "dimensions" : [], "type" : "count" },
+  "killTask/task/count" : { "dimensions" : [], "type" : "count" }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 4dd7cb5dd7..71495cac12 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
 import com.google.inject.Inject;
 import org.apache.druid.client.indexing.ClientCompactionIOConfig;
 import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
@@ -32,7 +33,6 @@ import 
org.apache.druid.client.indexing.ClientCompactionTaskQuery;
 import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
 import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
 import org.apache.druid.client.indexing.ClientTaskQuery;
-import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
 import org.apache.druid.client.indexing.TaskPayloadResponse;
 import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.common.utils.IdUtils;
@@ -41,11 +41,8 @@ import 
org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.granularity.GranularityType;
-import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.rpc.HttpResponseException;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
 import org.apache.druid.server.coordinator.CompactionStatistics;
@@ -59,17 +56,14 @@ import org.apache.druid.server.coordinator.stats.RowKey;
 import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentTimeline;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -87,6 +81,9 @@ public class CompactSegments implements CoordinatorCustomDuty
 
   private static final Logger LOG = new Logger(CompactSegments.class);
 
+  private static final Predicate<TaskStatusPlus> IS_COMPACTION_TASK =
+      status -> null != status && 
COMPACTION_TASK_TYPE.equals(status.getType());
+
   private final CompactionSegmentSearchPolicy policy;
   private final boolean skipLockedIntervals;
   private final OverlordClient overlordClient;
@@ -152,9 +149,10 @@ public class CompactSegments implements 
CoordinatorCustomDuty
 
     // Fetch currently running compaction tasks
     int busyCompactionTaskSlots = 0;
-    final CloseableIterator<TaskStatusPlus> activeTasks =
-        FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 0), 
true);
-    final List<TaskStatusPlus> compactionTasks = 
filterNonCompactionTasks(activeTasks);
+    final List<TaskStatusPlus> compactionTasks = 
CoordinatorDutyUtils.getNumActiveTaskSlots(
+        overlordClient,
+        IS_COMPACTION_TASK
+    );
     for (TaskStatusPlus status : compactionTasks) {
       final TaskPayloadResponse response =
           FutureUtils.getUnchecked(overlordClient.taskPayload(status.getId()), 
true);
@@ -336,62 +334,9 @@ public class CompactSegments implements 
CoordinatorCustomDuty
     return tuningConfig.getPartitionsSpec() instanceof 
DimensionRangePartitionsSpec;
   }
 
-  private static List<TaskStatusPlus> 
filterNonCompactionTasks(CloseableIterator<TaskStatusPlus> taskStatuses)
-  {
-    final List<TaskStatusPlus> retVal = new ArrayList<>();
-
-    try (final Closer closer = Closer.create()) {
-      closer.register(taskStatuses);
-      while (taskStatuses.hasNext()) {
-        final TaskStatusPlus status = taskStatuses.next();
-
-        // taskType can be null if middleManagers are running with an older 
version. Here, we consevatively regard
-        // the tasks of the unknown taskType as the compactionTask. This is 
because it's important to not run
-        // compactionTasks more than the configured limit at any time which 
might impact to the ingestion
-        // performance.
-        if (status.getType() == null || 
COMPACTION_TASK_TYPE.equals(status.getType())) {
-          retVal.add(status);
-        }
-      }
-    }
-    catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
-    return retVal;
-  }
-
   private int getCompactionTaskCapacity(CoordinatorCompactionConfig 
dynamicConfig)
   {
-    int totalWorkerCapacity;
-    try {
-      final IndexingTotalWorkerCapacityInfo workerCapacityInfo =
-          FutureUtils.get(overlordClient.getTotalWorkerCapacity(), true);
-
-      if (dynamicConfig.isUseAutoScaleSlots() && 
workerCapacityInfo.getMaximumCapacityWithAutoScale() > 0) {
-        totalWorkerCapacity = 
workerCapacityInfo.getMaximumCapacityWithAutoScale();
-      } else {
-        totalWorkerCapacity = workerCapacityInfo.getCurrentClusterCapacity();
-      }
-    }
-    catch (ExecutionException e) {
-      // Call to getTotalWorkerCapacity may fail during a rolling upgrade: API 
was added in 0.23.0.
-      if (e.getCause() instanceof HttpResponseException
-          && ((HttpResponseException) 
e.getCause()).getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
-        LOG.noStackTrace().warn(e, "Call to getTotalWorkerCapacity failed. 
Falling back to getWorkers.");
-        totalWorkerCapacity =
-            FutureUtils.getUnchecked(overlordClient.getWorkers(), true)
-                       .stream()
-                       .mapToInt(worker -> worker.getWorker().getCapacity())
-                       .sum();
-      } else {
-        throw new RuntimeException(e.getCause());
-      }
-    }
-    catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    }
+    int totalWorkerCapacity = 
CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient);
 
     return Math.min(
         (int) (totalWorkerCapacity * 
dynamicConfig.getCompactionTaskSlotRatio()),
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyUtils.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyUtils.java
new file mode 100644
index 0000000000..f6f31173fa
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.rpc.HttpResponseException;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * utilty methods that are useful for coordinator duties
+ */
+public class CoordinatorDutyUtils
+{
+
+  private static final Logger LOG = new Logger(CoordinatorDutyUtils.class);
+
+  /**
+   * Returns the total worker capacity in the cluster, including autoscaling, 
if enabled.
+   *
+   * @param overlordClient The overlord client used to get worker capacity 
info.
+   *
+   * @return the total worker capacity in the cluster, including autoscaling, 
if enabled.
+   */
+  public static int getTotalWorkerCapacity(@Nonnull final OverlordClient 
overlordClient)
+  {
+    int totalWorkerCapacity;
+    try {
+      final IndexingTotalWorkerCapacityInfo workerCapacityInfo =
+          FutureUtils.get(overlordClient.getTotalWorkerCapacity(), true);
+      totalWorkerCapacity = 
workerCapacityInfo.getMaximumCapacityWithAutoScale();
+      if (totalWorkerCapacity < 0) {
+        totalWorkerCapacity = workerCapacityInfo.getCurrentClusterCapacity();
+      }
+    }
+    catch (ExecutionException e) {
+      // Call to getTotalWorkerCapacity may fail during a rolling upgrade: API 
was added in 0.23.0.
+      if (e.getCause() instanceof HttpResponseException
+          && ((HttpResponseException) 
e.getCause()).getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
+        LOG.noStackTrace().warn(e, "Call to getTotalWorkerCapacity failed. 
Falling back to getWorkers.");
+        totalWorkerCapacity =
+            FutureUtils.getUnchecked(overlordClient.getWorkers(), true)
+                .stream()
+                .mapToInt(worker -> worker.getWorker().getCapacity())
+                .sum();
+      } else {
+        throw new RuntimeException(e.getCause());
+      }
+    }
+    catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+
+    return totalWorkerCapacity;
+  }
+
+  /**
+   * Return the number of active tasks that match the task predicate provided. 
The number of active tasks returned
+   * may be an overestimate, as tasks that return status's with null types 
will be conservatively counted to match the
+   * predicate provided.
+   *
+   * @param overlordClient The overlord client to use to retrieve the list of 
active tasks.
+   * @param taskPredicate  The predicate to match against the list of 
retreived task status.
+   *                       This predicate will never be called with a null 
task status.
+   *
+   * @return the number of active tasks that match the task predicate provided
+   */
+  public static List<TaskStatusPlus> getNumActiveTaskSlots(
+      @Nonnull final OverlordClient overlordClient,
+      final Predicate<TaskStatusPlus> taskPredicate
+  )
+  {
+    final CloseableIterator<TaskStatusPlus> activeTasks =
+        FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 0), 
true);
+    // Fetch currently running tasks that match the predicate
+    List<TaskStatusPlus> taskStatuses = new ArrayList<>();
+
+    try (final Closer closer = Closer.create()) {
+      closer.register(activeTasks);
+      while (activeTasks.hasNext()) {
+        final TaskStatusPlus status = activeTasks.next();
+
+        // taskType can be null if middleManagers are running with an older 
version. Here, we consevatively regard
+        // the tasks of the unknown taskType as the killTask. This is because 
it's important to not run
+        // killTasks more than the configured limit at any time which might 
impact to the ingestion
+        // performance.
+        if (null != status && (null == status.getType() || 
(taskPredicate.apply(status)))) {
+          taskStatuses.add(status);
+        }
+      }
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    return taskStatuses;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index 97bd2ab388..bbd58bfe63 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -21,32 +21,29 @@ package org.apache.druid.server.coordinator.duty;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
-import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
 import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.metadata.SegmentsMetadataManager;
-import org.apache.druid.rpc.HttpResponseException;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.utils.CollectionUtils;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 
 /**
  * Completely removes information about unused segments who have an interval 
end that comes before
@@ -61,6 +58,9 @@ public class KillUnusedSegments implements CoordinatorDuty
 {
   public static final String KILL_TASK_TYPE = "kill";
   public static final String TASK_ID_PREFIX = "coordinator-issued";
+  public static final Predicate<TaskStatusPlus> IS_AUTO_KILL_TASK =
+      status -> null != status
+                && (KILL_TASK_TYPE.equals(status.getType()) && 
status.getId().startsWith(TASK_ID_PREFIX));
   private static final Logger log = new Logger(KillUnusedSegments.class);
 
   private final long period;
@@ -112,69 +112,111 @@ public class KillUnusedSegments implements 
CoordinatorDuty
   @Override
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
   {
+
+    final long currentTimeMillis = System.currentTimeMillis();
+    if (lastKillTime + period > currentTimeMillis) {
+      log.debug("Skipping kill of unused segments as kill period has not 
elapsed yet.");
+      return params;
+    }
+    TaskStats taskStats = new TaskStats();
     Collection<String> dataSourcesToKill =
         
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
     double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
     int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
-    int availableKillTaskSlots = getAvailableKillTaskSlots(killTaskSlotRatio, 
maxKillTaskSlots);
-    if (0 == availableKillTaskSlots) {
-      log.debug("Not killing any unused segments because there are no 
available kill task slots at this time.");
-      return params;
-    }
+    int killTaskCapacity = getKillTaskCapacity(
+        CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient),
+        killTaskSlotRatio,
+        maxKillTaskSlots
+    );
+    int availableKillTaskSlots = getAvailableKillTaskSlots(
+        killTaskCapacity,
+        CoordinatorDutyUtils.getNumActiveTaskSlots(overlordClient, 
IS_AUTO_KILL_TASK).size()
+    );
+    final CoordinatorRunStats stats = params.getCoordinatorStats();
 
-    // If no datasource has been specified, all are eligible for killing 
unused segments
-    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-      dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames();
-    }
+    taskStats.availableTaskSlots = availableKillTaskSlots;
+    taskStats.maxSlots = killTaskCapacity;
+
+    if (0 < availableKillTaskSlots) {
+      // If no datasource has been specified, all are eligible for killing 
unused segments
+      if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+        dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
+      }
 
-    final long currentTimeMillis = System.currentTimeMillis();
-    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-      log.debug("No eligible datasource to kill unused segments.");
-    } else if (lastKillTime + period > currentTimeMillis) {
-      log.debug("Skipping kill of unused segments as kill period has not 
elapsed yet.");
-    } else {
       log.debug("Killing unused segments in datasources: %s", 
dataSourcesToKill);
       lastKillTime = currentTimeMillis;
-      killUnusedSegments(dataSourcesToKill, availableKillTaskSlots);
+      taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, 
availableKillTaskSlots);
+
     }
 
+    addStats(taskStats, stats);
     return params;
   }
 
-  private void killUnusedSegments(Collection<String> dataSourcesToKill, int 
availableKillTaskSlots)
+  private void addStats(
+      TaskStats taskStats,
+      CoordinatorRunStats stats
+  )
   {
-    int submittedTasks = 0;
-    for (String dataSource : dataSourcesToKill) {
-      if (submittedTasks >= availableKillTaskSlots) {
-        log.info(StringUtils.format(
-            "Submitted [%d] kill tasks and reached kill task slot limit [%d]. 
Will resume "
-            + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
-        break;
-      }
-      final Interval intervalToKill = findIntervalForKill(dataSource);
-      if (intervalToKill == null) {
-        continue;
-      }
+    stats.add(Stats.Kill.AVAILABLE_SLOTS, taskStats.availableTaskSlots);
+    stats.add(Stats.Kill.SUBMITTED_TASKS, taskStats.submittedTasks);
+    stats.add(Stats.Kill.MAX_SLOTS, taskStats.maxSlots);
+  }
 
-      try {
-        FutureUtils.getUnchecked(overlordClient.runKillTask(
-            TASK_ID_PREFIX,
-            dataSource,
-            intervalToKill,
-            maxSegmentsToKill
-        ), true);
-        ++submittedTasks;
-      }
-      catch (Exception ex) {
-        log.error(ex, "Failed to submit kill task for dataSource [%s]", 
dataSource);
-        if (Thread.currentThread().isInterrupted()) {
-          log.warn("skipping kill task scheduling because thread is 
interrupted.");
+  private int killUnusedSegments(
+      Collection<String> dataSourcesToKill,
+      int availableKillTaskSlots
+  )
+  {
+    int submittedTasks = 0;
+    if (0 < availableKillTaskSlots && 
!CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+      for (String dataSource : dataSourcesToKill) {
+        if (submittedTasks >= availableKillTaskSlots) {
+          log.info(StringUtils.format(
+              "Submitted [%d] kill tasks and reached kill task slot limit 
[%d]. Will resume "
+              + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
           break;
         }
+        final Interval intervalToKill = findIntervalForKill(dataSource);
+        if (intervalToKill == null) {
+          continue;
+        }
+
+        try {
+          FutureUtils.getUnchecked(overlordClient.runKillTask(
+              TASK_ID_PREFIX,
+              dataSource,
+              intervalToKill,
+              maxSegmentsToKill
+          ), true);
+          ++submittedTasks;
+        }
+        catch (Exception ex) {
+          log.error(ex, "Failed to submit kill task for dataSource [%s]", 
dataSource);
+          if (Thread.currentThread().isInterrupted()) {
+            log.warn("skipping kill task scheduling because thread is 
interrupted.");
+            break;
+          }
+        }
       }
     }
 
-    log.debug("Submitted [%d] kill tasks for [%d] datasources.", 
submittedTasks, dataSourcesToKill.size());
+    if (log.isDebugEnabled()) {
+      log.debug(
+          "Submitted [%d] kill tasks for [%d] datasources.%s",
+          submittedTasks,
+          dataSourcesToKill.size(),
+          availableKillTaskSlots < dataSourcesToKill.size()
+              ? StringUtils.format(
+              " Datasources skipped: %s",
+              ImmutableList.copyOf(dataSourcesToKill).subList(submittedTasks, 
dataSourcesToKill.size())
+          )
+              : ""
+      );
+    }
+
+    // report stats
+    return submittedTasks;
   }
 
   /**
@@ -199,86 +241,31 @@ public class KillUnusedSegments implements CoordinatorDuty
     }
   }
 
-  private int getAvailableKillTaskSlots(double killTaskSlotRatio, int 
maxKillTaskSlots)
+  private int getAvailableKillTaskSlots(int killTaskCapacity, int 
numActiveKillTasks)
   {
     return Math.max(
         0,
-        getKillTaskCapacity(getTotalWorkerCapacity(), killTaskSlotRatio, 
maxKillTaskSlots) - getNumActiveKillTaskSlots()
+        killTaskCapacity - numActiveKillTasks
     );
   }
 
-  /**
-   * Get the number of active kill task slots in use. The kill tasks counted, 
are only those thare are submitted
-   * by this coordinator duty (have prefix {@link 
KillUnusedSegments#TASK_ID_PREFIX}. The value returned here
-   * may be an overestimate, as in some cased the taskType can be null if 
middleManagers are running with an older
-   * version, and these tasks are counted as active kill tasks to be safe.
-   * @return
-   */
-  private int getNumActiveKillTaskSlots()
+  @VisibleForTesting
+  static int getKillTaskCapacity(int totalWorkerCapacity, double 
killTaskSlotRatio, int maxKillTaskSlots)
   {
-    final CloseableIterator<TaskStatusPlus> activeTasks =
-        FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 0), 
true);
-    // Fetch currently running kill tasks
-    int numActiveKillTasks = 0;
-
-    try (final Closer closer = Closer.create()) {
-      closer.register(activeTasks);
-      while (activeTasks.hasNext()) {
-        final TaskStatusPlus status = activeTasks.next();
-
-        // taskType can be null if middleManagers are running with an older 
version. Here, we consevatively regard
-        // the tasks of the unknown taskType as the killTask. This is because 
it's important to not run
-        // killTasks more than the configured limit at any time which might 
impact to the ingestion
-        // performance.
-        if (status.getType() == null
-            || (KILL_TASK_TYPE.equals(status.getType()) && 
status.getId().startsWith(TASK_ID_PREFIX))) {
-          numActiveKillTasks++;
-        }
-      }
-    }
-    catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
-    return numActiveKillTasks;
+    return Math.min((int) (totalWorkerCapacity * Math.min(killTaskSlotRatio, 
1.0)), maxKillTaskSlots);
   }
 
-  private int getTotalWorkerCapacity()
+  static class TaskStats
   {
-    int totalWorkerCapacity;
-    try {
-      final IndexingTotalWorkerCapacityInfo workerCapacityInfo =
-          FutureUtils.get(overlordClient.getTotalWorkerCapacity(), true);
-      totalWorkerCapacity = 
workerCapacityInfo.getMaximumCapacityWithAutoScale();
-      if (totalWorkerCapacity < 0) {
-        totalWorkerCapacity = workerCapacityInfo.getCurrentClusterCapacity();
-      }
-    }
-    catch (ExecutionException e) {
-      // Call to getTotalWorkerCapacity may fail during a rolling upgrade: API 
was added in 0.23.0.
-      if (e.getCause() instanceof HttpResponseException
-          && ((HttpResponseException) 
e.getCause()).getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
-        log.noStackTrace().warn(e, "Call to getTotalWorkerCapacity failed. 
Falling back to getWorkers.");
-        totalWorkerCapacity =
-            FutureUtils.getUnchecked(overlordClient.getWorkers(), true)
-                .stream()
-                .mapToInt(worker -> worker.getWorker().getCapacity())
-                .sum();
-      } else {
-        throw new RuntimeException(e.getCause());
-      }
-    }
-    catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    }
+    int availableTaskSlots;
+    int maxSlots;
+    int submittedTasks;
 
-    return totalWorkerCapacity;
-  }
-
-  @VisibleForTesting
-  static int getKillTaskCapacity(int totalWorkerCapacity, double 
killTaskSlotRatio, int maxKillTaskSlots)
-  {
-    return Math.min((int) (totalWorkerCapacity * Math.min(killTaskSlotRatio, 
1.0)), maxKillTaskSlots);
+    TaskStats()
+    {
+      availableTaskSlots = 0;
+      maxSlots = 0;
+      submittedTasks = 0;
+    }
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
index ac37767327..2f97972dc7 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
@@ -141,6 +141,12 @@ public class Stats
         = CoordinatorStat.toDebugAndEmit("killedAuditLogs", 
"metadata/kill/audit/count");
     public static final CoordinatorStat DATASOURCES
         = CoordinatorStat.toDebugAndEmit("killedDatasources", 
"metadata/kill/datasource/count");
+    public static final CoordinatorStat AVAILABLE_SLOTS
+        = CoordinatorStat.toDebugAndEmit("killAvailSlots", 
"killTask/availableSlot/count");
+    public static final CoordinatorStat MAX_SLOTS
+        = CoordinatorStat.toDebugAndEmit("killMaxSlots", 
"killTask/maxSlot/count");
+    public static final CoordinatorStat SUBMITTED_TASKS
+        = CoordinatorStat.toDebugAndEmit("killTasks", "kill/task/count");
   }
 
   public static class Balancer
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index e67063fb7b..5d0c81385c 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -33,6 +33,8 @@ import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.joda.time.DateTime;
@@ -76,6 +78,8 @@ public class KillUnusedSegmentsTest
   @Mock(answer = Answers.RETURNS_DEEP_STUBS)
   private DruidCoordinatorConfig config;
 
+  @Mock
+  private CoordinatorRunStats stats;
   @Mock
   private DruidCoordinatorRuntimeParams params;
   @Mock
@@ -94,6 +98,7 @@ public class KillUnusedSegmentsTest
   public void setup()
   {
     
Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig();
+    Mockito.doReturn(stats).when(params).getCoordinatorStats();
     
Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod();
     
Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain();
     
Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod();
@@ -181,6 +186,7 @@ public class KillUnusedSegmentsTest
     );
     mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
     runAndVerifyKillInterval(expectedKillInterval);
+    verifyStats(9, 1, 10);
   }
 
   @Test
@@ -198,6 +204,7 @@ public class KillUnusedSegmentsTest
     );
     mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
     runAndVerifyKillInterval(expectedKillInterval);
+    verifyStats(9, 1, 10);
   }
 
   @Test
@@ -214,6 +221,7 @@ public class KillUnusedSegmentsTest
     );
     mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
     runAndVerifyKillInterval(expectedKillInterval);
+    verifyStats(9, 1, 10);
   }
 
   @Test
@@ -226,6 +234,7 @@ public class KillUnusedSegmentsTest
     mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
     // Only 1 unused segment is killed
     runAndVerifyKillInterval(yearOldSegment.getInterval());
+    verifyStats(9, 1, 10);
   }
 
   @Test
@@ -233,6 +242,7 @@ public class KillUnusedSegmentsTest
   {
     mockTaskSlotUsage(0.10, 10, 1, 5);
     runAndVerifyNoKill();
+    verifyStats(0, 0, 0);
   }
 
   @Test
@@ -279,7 +289,15 @@ public class KillUnusedSegmentsTest
   private void runAndVerifyKillInterval(Interval expectedKillInterval)
   {
     int limit = config.getCoordinatorKillMaxSegments();
+    Mockito.doReturn(Futures.immediateFuture("ok"))
+        .when(overlordClient)
+        .runKillTask(
+            ArgumentMatchers.anyString(),
+            ArgumentMatchers.anyString(),
+            ArgumentMatchers.any(Interval.class),
+            ArgumentMatchers.anyInt());
     target.run(params);
+
     Mockito.verify(overlordClient, Mockito.times(1)).runKillTask(
         ArgumentMatchers.anyString(),
         ArgumentMatchers.eq("DS1"),
@@ -288,6 +306,13 @@ public class KillUnusedSegmentsTest
     );
   }
 
+  private void verifyStats(int availableSlots, int submittedTasks, int 
maxSlots)
+  {
+    Mockito.verify(stats).add(Stats.Kill.AVAILABLE_SLOTS, availableSlots);
+    Mockito.verify(stats).add(Stats.Kill.SUBMITTED_TASKS, submittedTasks);
+    Mockito.verify(stats).add(Stats.Kill.MAX_SLOTS, maxSlots);
+  }
+
   private void runAndVerifyNoKill()
   {
     target.run(params);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to