This is an automated email from the ASF dual-hosted git repository.
zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 82d82dfbd6 Add stats to KillUnusedSegments coordinator duty (#14782)
82d82dfbd6 is described below
commit 82d82dfbd6af5e6ed83d0c663c55dc538d6b1d9e
Author: zachjsh <[email protected]>
AuthorDate: Thu Aug 10 18:36:53 2023 -0400
Add stats to KillUnusedSegments coordinator duty (#14782)
### Description
Added the following metrics, which are calculated from the
`KillUnusedSegments` coordinatorDuty
`"killTask/availableSlot/count"`: calculates the number remaining task
slots available for auto kill
`"killTask/maxSlot/count"`: calculates the maximum number of tasks
available for auto kill
`"killTask/task/count"`: calculates the number of tasks submitted by auto
kill.
#### Release note
NEW: metrics added for auto kill
`"killTask/availableSlot/count"`: calculates the number remaining task
slots available for auto kill
`"killTask/maxSlot/count"`: calculates the maximum number of tasks
available for auto kill
`"killTask/task/count"`: calculates the number of tasks submitted by auto
kill.
---
docs/operations/metrics.md | 3 +
.../main/resources/defaultMetricDimensions.json | 6 +-
.../server/coordinator/duty/CompactSegments.java | 73 +------
.../coordinator/duty/CoordinatorDutyUtils.java | 129 ++++++++++++
.../coordinator/duty/KillUnusedSegments.java | 227 ++++++++++-----------
.../druid/server/coordinator/stats/Stats.java | 6 +
.../coordinator/duty/KillUnusedSegmentsTest.java | 25 +++
7 files changed, 284 insertions(+), 185 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 6383a4057c..8b809721b9 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -320,6 +320,9 @@ These metrics are for the Druid Coordinator and are reset
each time the Coordina
|`compact/task/count`|Number of tasks issued in the auto compaction run.|
|Varies|
|`compactTask/maxSlot/count`|Maximum number of task slots available for auto
compaction tasks in the auto compaction run.| |Varies|
|`compactTask/availableSlot/count`|Number of available task slots that can be
used for auto compaction tasks in the auto compaction run. This is the max
number of task slots minus any currently running compaction tasks.| |Varies|
+|`killTask/availableSlot/count`| Number of available task slots that can be
used for auto kill tasks in the auto kill run. This is the max number of task
slots minus any currently running auto kill tasks.
[...]
+|`killTask/maxSlot/count`| Maximum number of task slots available for auto
kill tasks in the auto kill run.
[...]
+|`kill/task/count`| Number of tasks issued in the auto kill run.
[...]
|`segment/waitCompact/bytes`|Total bytes of this datasource waiting to be
compacted by the auto compaction (only consider intervals/segments that are
eligible for auto compaction).|`dataSource`|Varies|
|`segment/waitCompact/count`|Total number of segments of this datasource
waiting to be compacted by the auto compaction (only consider
intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|
|`interval/waitCompact/count`|Total number of intervals of this datasource
waiting to be compacted by the auto compaction (only consider
intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|
diff --git
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
index 9b134cc542..b5c5f8b1ce 100644
---
a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
+++
b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
@@ -175,5 +175,9 @@
"namespace/cache/numEntries" : { "dimensions" : [], "type" : "gauge" },
"namespace/cache/heapSizeInBytes" : { "dimensions" : [], "type" : "gauge" },
- "service/heartbeat" : { "dimensions" : ["leader"], "type" : "count" }
+ "service/heartbeat" : { "dimensions" : ["leader"], "type" : "count" },
+
+ "killTask/availableSlot/count" : { "dimensions" : [], "type" : "count" },
+ "killTask/maxSlot/count" : { "dimensions" : [], "type" : "count" },
+ "killTask/task/count" : { "dimensions" : [], "type" : "count" }
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 4dd7cb5dd7..71495cac12 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
import com.google.inject.Inject;
import org.apache.druid.client.indexing.ClientCompactionIOConfig;
import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
@@ -32,7 +33,6 @@ import
org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.client.indexing.ClientTaskQuery;
-import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.common.utils.IdUtils;
@@ -41,11 +41,8 @@ import
org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
-import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CompactionStatistics;
@@ -59,17 +56,14 @@ import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Interval;
import javax.annotation.Nullable;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -87,6 +81,9 @@ public class CompactSegments implements CoordinatorCustomDuty
private static final Logger LOG = new Logger(CompactSegments.class);
+ private static final Predicate<TaskStatusPlus> IS_COMPACTION_TASK =
+ status -> null != status &&
COMPACTION_TASK_TYPE.equals(status.getType());
+
private final CompactionSegmentSearchPolicy policy;
private final boolean skipLockedIntervals;
private final OverlordClient overlordClient;
@@ -152,9 +149,10 @@ public class CompactSegments implements
CoordinatorCustomDuty
// Fetch currently running compaction tasks
int busyCompactionTaskSlots = 0;
- final CloseableIterator<TaskStatusPlus> activeTasks =
- FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 0),
true);
- final List<TaskStatusPlus> compactionTasks =
filterNonCompactionTasks(activeTasks);
+ final List<TaskStatusPlus> compactionTasks =
CoordinatorDutyUtils.getNumActiveTaskSlots(
+ overlordClient,
+ IS_COMPACTION_TASK
+ );
for (TaskStatusPlus status : compactionTasks) {
final TaskPayloadResponse response =
FutureUtils.getUnchecked(overlordClient.taskPayload(status.getId()),
true);
@@ -336,62 +334,9 @@ public class CompactSegments implements
CoordinatorCustomDuty
return tuningConfig.getPartitionsSpec() instanceof
DimensionRangePartitionsSpec;
}
- private static List<TaskStatusPlus>
filterNonCompactionTasks(CloseableIterator<TaskStatusPlus> taskStatuses)
- {
- final List<TaskStatusPlus> retVal = new ArrayList<>();
-
- try (final Closer closer = Closer.create()) {
- closer.register(taskStatuses);
- while (taskStatuses.hasNext()) {
- final TaskStatusPlus status = taskStatuses.next();
-
- // taskType can be null if middleManagers are running with an older
version. Here, we consevatively regard
- // the tasks of the unknown taskType as the compactionTask. This is
because it's important to not run
- // compactionTasks more than the configured limit at any time which
might impact to the ingestion
- // performance.
- if (status.getType() == null ||
COMPACTION_TASK_TYPE.equals(status.getType())) {
- retVal.add(status);
- }
- }
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- return retVal;
- }
-
private int getCompactionTaskCapacity(CoordinatorCompactionConfig
dynamicConfig)
{
- int totalWorkerCapacity;
- try {
- final IndexingTotalWorkerCapacityInfo workerCapacityInfo =
- FutureUtils.get(overlordClient.getTotalWorkerCapacity(), true);
-
- if (dynamicConfig.isUseAutoScaleSlots() &&
workerCapacityInfo.getMaximumCapacityWithAutoScale() > 0) {
- totalWorkerCapacity =
workerCapacityInfo.getMaximumCapacityWithAutoScale();
- } else {
- totalWorkerCapacity = workerCapacityInfo.getCurrentClusterCapacity();
- }
- }
- catch (ExecutionException e) {
- // Call to getTotalWorkerCapacity may fail during a rolling upgrade: API
was added in 0.23.0.
- if (e.getCause() instanceof HttpResponseException
- && ((HttpResponseException)
e.getCause()).getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
- LOG.noStackTrace().warn(e, "Call to getTotalWorkerCapacity failed.
Falling back to getWorkers.");
- totalWorkerCapacity =
- FutureUtils.getUnchecked(overlordClient.getWorkers(), true)
- .stream()
- .mapToInt(worker -> worker.getWorker().getCapacity())
- .sum();
- } else {
- throw new RuntimeException(e.getCause());
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
+ int totalWorkerCapacity =
CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient);
return Math.min(
(int) (totalWorkerCapacity *
dynamicConfig.getCompactionTaskSlotRatio()),
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyUtils.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyUtils.java
new file mode 100644
index 0000000000..f6f31173fa
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.rpc.HttpResponseException;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * utilty methods that are useful for coordinator duties
+ */
+public class CoordinatorDutyUtils
+{
+
+ private static final Logger LOG = new Logger(CoordinatorDutyUtils.class);
+
+ /**
+ * Returns the total worker capacity in the cluster, including autoscaling,
if enabled.
+ *
+ * @param overlordClient The overlord client used to get worker capacity
info.
+ *
+ * @return the total worker capacity in the cluster, including autoscaling,
if enabled.
+ */
+ public static int getTotalWorkerCapacity(@Nonnull final OverlordClient
overlordClient)
+ {
+ int totalWorkerCapacity;
+ try {
+ final IndexingTotalWorkerCapacityInfo workerCapacityInfo =
+ FutureUtils.get(overlordClient.getTotalWorkerCapacity(), true);
+ totalWorkerCapacity =
workerCapacityInfo.getMaximumCapacityWithAutoScale();
+ if (totalWorkerCapacity < 0) {
+ totalWorkerCapacity = workerCapacityInfo.getCurrentClusterCapacity();
+ }
+ }
+ catch (ExecutionException e) {
+ // Call to getTotalWorkerCapacity may fail during a rolling upgrade: API
was added in 0.23.0.
+ if (e.getCause() instanceof HttpResponseException
+ && ((HttpResponseException)
e.getCause()).getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
+ LOG.noStackTrace().warn(e, "Call to getTotalWorkerCapacity failed.
Falling back to getWorkers.");
+ totalWorkerCapacity =
+ FutureUtils.getUnchecked(overlordClient.getWorkers(), true)
+ .stream()
+ .mapToInt(worker -> worker.getWorker().getCapacity())
+ .sum();
+ } else {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+
+ return totalWorkerCapacity;
+ }
+
+ /**
+ * Return the number of active tasks that match the task predicate provided.
The number of active tasks returned
+ * may be an overestimate, as tasks that return status's with null types
will be conservatively counted to match the
+ * predicate provided.
+ *
+ * @param overlordClient The overlord client to use to retrieve the list of
active tasks.
+ * @param taskPredicate The predicate to match against the list of
retreived task status.
+ * This predicate will never be called with a null
task status.
+ *
+ * @return the number of active tasks that match the task predicate provided
+ */
+ public static List<TaskStatusPlus> getNumActiveTaskSlots(
+ @Nonnull final OverlordClient overlordClient,
+ final Predicate<TaskStatusPlus> taskPredicate
+ )
+ {
+ final CloseableIterator<TaskStatusPlus> activeTasks =
+ FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 0),
true);
+ // Fetch currently running tasks that match the predicate
+ List<TaskStatusPlus> taskStatuses = new ArrayList<>();
+
+ try (final Closer closer = Closer.create()) {
+ closer.register(activeTasks);
+ while (activeTasks.hasNext()) {
+ final TaskStatusPlus status = activeTasks.next();
+
+ // taskType can be null if middleManagers are running with an older
version. Here, we consevatively regard
+ // the tasks of the unknown taskType as the killTask. This is because
it's important to not run
+ // killTasks more than the configured limit at any time which might
impact to the ingestion
+ // performance.
+ if (null != status && (null == status.getType() ||
(taskPredicate.apply(status)))) {
+ taskStatuses.add(status);
+ }
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return taskStatuses;
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index 97bd2ab388..bbd58bfe63 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -21,32 +21,29 @@ package org.apache.druid.server.coordinator.duty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
-import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.SegmentsMetadataManager;
-import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.utils.CollectionUtils;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
-import java.io.IOException;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.ExecutionException;
/**
* Completely removes information about unused segments who have an interval
end that comes before
@@ -61,6 +58,9 @@ public class KillUnusedSegments implements CoordinatorDuty
{
public static final String KILL_TASK_TYPE = "kill";
public static final String TASK_ID_PREFIX = "coordinator-issued";
+ public static final Predicate<TaskStatusPlus> IS_AUTO_KILL_TASK =
+ status -> null != status
+ && (KILL_TASK_TYPE.equals(status.getType()) &&
status.getId().startsWith(TASK_ID_PREFIX));
private static final Logger log = new Logger(KillUnusedSegments.class);
private final long period;
@@ -112,69 +112,111 @@ public class KillUnusedSegments implements
CoordinatorDuty
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
{
+
+ final long currentTimeMillis = System.currentTimeMillis();
+ if (lastKillTime + period > currentTimeMillis) {
+ log.debug("Skipping kill of unused segments as kill period has not
elapsed yet.");
+ return params;
+ }
+ TaskStats taskStats = new TaskStats();
Collection<String> dataSourcesToKill =
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
double killTaskSlotRatio =
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
int maxKillTaskSlots =
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
- int availableKillTaskSlots = getAvailableKillTaskSlots(killTaskSlotRatio,
maxKillTaskSlots);
- if (0 == availableKillTaskSlots) {
- log.debug("Not killing any unused segments because there are no
available kill task slots at this time.");
- return params;
- }
+ int killTaskCapacity = getKillTaskCapacity(
+ CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient),
+ killTaskSlotRatio,
+ maxKillTaskSlots
+ );
+ int availableKillTaskSlots = getAvailableKillTaskSlots(
+ killTaskCapacity,
+ CoordinatorDutyUtils.getNumActiveTaskSlots(overlordClient,
IS_AUTO_KILL_TASK).size()
+ );
+ final CoordinatorRunStats stats = params.getCoordinatorStats();
- // If no datasource has been specified, all are eligible for killing
unused segments
- if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
- dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames();
- }
+ taskStats.availableTaskSlots = availableKillTaskSlots;
+ taskStats.maxSlots = killTaskCapacity;
+
+ if (0 < availableKillTaskSlots) {
+ // If no datasource has been specified, all are eligible for killing
unused segments
+ if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+ dataSourcesToKill =
segmentsMetadataManager.retrieveAllDataSourceNames();
+ }
- final long currentTimeMillis = System.currentTimeMillis();
- if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
- log.debug("No eligible datasource to kill unused segments.");
- } else if (lastKillTime + period > currentTimeMillis) {
- log.debug("Skipping kill of unused segments as kill period has not
elapsed yet.");
- } else {
log.debug("Killing unused segments in datasources: %s",
dataSourcesToKill);
lastKillTime = currentTimeMillis;
- killUnusedSegments(dataSourcesToKill, availableKillTaskSlots);
+ taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill,
availableKillTaskSlots);
+
}
+ addStats(taskStats, stats);
return params;
}
- private void killUnusedSegments(Collection<String> dataSourcesToKill, int
availableKillTaskSlots)
+ private void addStats(
+ TaskStats taskStats,
+ CoordinatorRunStats stats
+ )
{
- int submittedTasks = 0;
- for (String dataSource : dataSourcesToKill) {
- if (submittedTasks >= availableKillTaskSlots) {
- log.info(StringUtils.format(
- "Submitted [%d] kill tasks and reached kill task slot limit [%d].
Will resume "
- + "on the next coordinator cycle.", submittedTasks,
availableKillTaskSlots));
- break;
- }
- final Interval intervalToKill = findIntervalForKill(dataSource);
- if (intervalToKill == null) {
- continue;
- }
+ stats.add(Stats.Kill.AVAILABLE_SLOTS, taskStats.availableTaskSlots);
+ stats.add(Stats.Kill.SUBMITTED_TASKS, taskStats.submittedTasks);
+ stats.add(Stats.Kill.MAX_SLOTS, taskStats.maxSlots);
+ }
- try {
- FutureUtils.getUnchecked(overlordClient.runKillTask(
- TASK_ID_PREFIX,
- dataSource,
- intervalToKill,
- maxSegmentsToKill
- ), true);
- ++submittedTasks;
- }
- catch (Exception ex) {
- log.error(ex, "Failed to submit kill task for dataSource [%s]",
dataSource);
- if (Thread.currentThread().isInterrupted()) {
- log.warn("skipping kill task scheduling because thread is
interrupted.");
+ private int killUnusedSegments(
+ Collection<String> dataSourcesToKill,
+ int availableKillTaskSlots
+ )
+ {
+ int submittedTasks = 0;
+ if (0 < availableKillTaskSlots &&
!CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+ for (String dataSource : dataSourcesToKill) {
+ if (submittedTasks >= availableKillTaskSlots) {
+ log.info(StringUtils.format(
+ "Submitted [%d] kill tasks and reached kill task slot limit
[%d]. Will resume "
+ + "on the next coordinator cycle.", submittedTasks,
availableKillTaskSlots));
break;
}
+ final Interval intervalToKill = findIntervalForKill(dataSource);
+ if (intervalToKill == null) {
+ continue;
+ }
+
+ try {
+ FutureUtils.getUnchecked(overlordClient.runKillTask(
+ TASK_ID_PREFIX,
+ dataSource,
+ intervalToKill,
+ maxSegmentsToKill
+ ), true);
+ ++submittedTasks;
+ }
+ catch (Exception ex) {
+ log.error(ex, "Failed to submit kill task for dataSource [%s]",
dataSource);
+ if (Thread.currentThread().isInterrupted()) {
+ log.warn("skipping kill task scheduling because thread is
interrupted.");
+ break;
+ }
+ }
}
}
- log.debug("Submitted [%d] kill tasks for [%d] datasources.",
submittedTasks, dataSourcesToKill.size());
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Submitted [%d] kill tasks for [%d] datasources.%s",
+ submittedTasks,
+ dataSourcesToKill.size(),
+ availableKillTaskSlots < dataSourcesToKill.size()
+ ? StringUtils.format(
+ " Datasources skipped: %s",
+ ImmutableList.copyOf(dataSourcesToKill).subList(submittedTasks,
dataSourcesToKill.size())
+ )
+ : ""
+ );
+ }
+
+ // report stats
+ return submittedTasks;
}
/**
@@ -199,86 +241,31 @@ public class KillUnusedSegments implements CoordinatorDuty
}
}
- private int getAvailableKillTaskSlots(double killTaskSlotRatio, int
maxKillTaskSlots)
+ private int getAvailableKillTaskSlots(int killTaskCapacity, int
numActiveKillTasks)
{
return Math.max(
0,
- getKillTaskCapacity(getTotalWorkerCapacity(), killTaskSlotRatio,
maxKillTaskSlots) - getNumActiveKillTaskSlots()
+ killTaskCapacity - numActiveKillTasks
);
}
- /**
- * Get the number of active kill task slots in use. The kill tasks counted,
are only those thare are submitted
- * by this coordinator duty (have prefix {@link
KillUnusedSegments#TASK_ID_PREFIX}. The value returned here
- * may be an overestimate, as in some cased the taskType can be null if
middleManagers are running with an older
- * version, and these tasks are counted as active kill tasks to be safe.
- * @return
- */
- private int getNumActiveKillTaskSlots()
+ @VisibleForTesting
+ static int getKillTaskCapacity(int totalWorkerCapacity, double
killTaskSlotRatio, int maxKillTaskSlots)
{
- final CloseableIterator<TaskStatusPlus> activeTasks =
- FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 0),
true);
- // Fetch currently running kill tasks
- int numActiveKillTasks = 0;
-
- try (final Closer closer = Closer.create()) {
- closer.register(activeTasks);
- while (activeTasks.hasNext()) {
- final TaskStatusPlus status = activeTasks.next();
-
- // taskType can be null if middleManagers are running with an older
version. Here, we consevatively regard
- // the tasks of the unknown taskType as the killTask. This is because
it's important to not run
- // killTasks more than the configured limit at any time which might
impact to the ingestion
- // performance.
- if (status.getType() == null
- || (KILL_TASK_TYPE.equals(status.getType()) &&
status.getId().startsWith(TASK_ID_PREFIX))) {
- numActiveKillTasks++;
- }
- }
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- return numActiveKillTasks;
+ return Math.min((int) (totalWorkerCapacity * Math.min(killTaskSlotRatio,
1.0)), maxKillTaskSlots);
}
- private int getTotalWorkerCapacity()
+ static class TaskStats
{
- int totalWorkerCapacity;
- try {
- final IndexingTotalWorkerCapacityInfo workerCapacityInfo =
- FutureUtils.get(overlordClient.getTotalWorkerCapacity(), true);
- totalWorkerCapacity =
workerCapacityInfo.getMaximumCapacityWithAutoScale();
- if (totalWorkerCapacity < 0) {
- totalWorkerCapacity = workerCapacityInfo.getCurrentClusterCapacity();
- }
- }
- catch (ExecutionException e) {
- // Call to getTotalWorkerCapacity may fail during a rolling upgrade: API
was added in 0.23.0.
- if (e.getCause() instanceof HttpResponseException
- && ((HttpResponseException)
e.getCause()).getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
- log.noStackTrace().warn(e, "Call to getTotalWorkerCapacity failed.
Falling back to getWorkers.");
- totalWorkerCapacity =
- FutureUtils.getUnchecked(overlordClient.getWorkers(), true)
- .stream()
- .mapToInt(worker -> worker.getWorker().getCapacity())
- .sum();
- } else {
- throw new RuntimeException(e.getCause());
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
+ int availableTaskSlots;
+ int maxSlots;
+ int submittedTasks;
- return totalWorkerCapacity;
- }
-
- @VisibleForTesting
- static int getKillTaskCapacity(int totalWorkerCapacity, double
killTaskSlotRatio, int maxKillTaskSlots)
- {
- return Math.min((int) (totalWorkerCapacity * Math.min(killTaskSlotRatio,
1.0)), maxKillTaskSlots);
+ TaskStats()
+ {
+ availableTaskSlots = 0;
+ maxSlots = 0;
+ submittedTasks = 0;
+ }
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
index ac37767327..2f97972dc7 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
@@ -141,6 +141,12 @@ public class Stats
= CoordinatorStat.toDebugAndEmit("killedAuditLogs",
"metadata/kill/audit/count");
public static final CoordinatorStat DATASOURCES
= CoordinatorStat.toDebugAndEmit("killedDatasources",
"metadata/kill/datasource/count");
+ public static final CoordinatorStat AVAILABLE_SLOTS
+ = CoordinatorStat.toDebugAndEmit("killAvailSlots",
"killTask/availableSlot/count");
+ public static final CoordinatorStat MAX_SLOTS
+ = CoordinatorStat.toDebugAndEmit("killMaxSlots",
"killTask/maxSlot/count");
+ public static final CoordinatorStat SUBMITTED_TASKS
+ = CoordinatorStat.toDebugAndEmit("killTasks", "kill/task/count");
}
public static class Balancer
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index e67063fb7b..5d0c81385c 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -33,6 +33,8 @@ import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
@@ -76,6 +78,8 @@ public class KillUnusedSegmentsTest
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private DruidCoordinatorConfig config;
+ @Mock
+ private CoordinatorRunStats stats;
@Mock
private DruidCoordinatorRuntimeParams params;
@Mock
@@ -94,6 +98,7 @@ public class KillUnusedSegmentsTest
public void setup()
{
Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig();
+ Mockito.doReturn(stats).when(params).getCoordinatorStats();
Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod();
Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain();
Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod();
@@ -181,6 +186,7 @@ public class KillUnusedSegmentsTest
);
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
runAndVerifyKillInterval(expectedKillInterval);
+ verifyStats(9, 1, 10);
}
@Test
@@ -198,6 +204,7 @@ public class KillUnusedSegmentsTest
);
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
runAndVerifyKillInterval(expectedKillInterval);
+ verifyStats(9, 1, 10);
}
@Test
@@ -214,6 +221,7 @@ public class KillUnusedSegmentsTest
);
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
runAndVerifyKillInterval(expectedKillInterval);
+ verifyStats(9, 1, 10);
}
@Test
@@ -226,6 +234,7 @@ public class KillUnusedSegmentsTest
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
// Only 1 unused segment is killed
runAndVerifyKillInterval(yearOldSegment.getInterval());
+ verifyStats(9, 1, 10);
}
@Test
@@ -233,6 +242,7 @@ public class KillUnusedSegmentsTest
{
mockTaskSlotUsage(0.10, 10, 1, 5);
runAndVerifyNoKill();
+ verifyStats(0, 0, 0);
}
@Test
@@ -279,7 +289,15 @@ public class KillUnusedSegmentsTest
private void runAndVerifyKillInterval(Interval expectedKillInterval)
{
int limit = config.getCoordinatorKillMaxSegments();
+ Mockito.doReturn(Futures.immediateFuture("ok"))
+ .when(overlordClient)
+ .runKillTask(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.any(Interval.class),
+ ArgumentMatchers.anyInt());
target.run(params);
+
Mockito.verify(overlordClient, Mockito.times(1)).runKillTask(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("DS1"),
@@ -288,6 +306,13 @@ public class KillUnusedSegmentsTest
);
}
+ private void verifyStats(int availableSlots, int submittedTasks, int
maxSlots)
+ {
+ Mockito.verify(stats).add(Stats.Kill.AVAILABLE_SLOTS, availableSlots);
+ Mockito.verify(stats).add(Stats.Kill.SUBMITTED_TASKS, submittedTasks);
+ Mockito.verify(stats).add(Stats.Kill.MAX_SLOTS, maxSlots);
+ }
+
private void runAndVerifyNoKill()
{
target.run(params);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]