This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 647686aee2 Add test and metrics for KillStalePendingSegments duty
(#14951)
647686aee2 is described below
commit 647686aee2a0cf29cc50a13c53d97b61ba479e0c
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Sep 8 10:33:47 2023 +0530
Add test and metrics for KillStalePendingSegments duty (#14951)
Changes:
- Add new metric `kill/pendingSegments/count` with dimension `dataSource`
- Add tests for `KillStalePendingSegments`
- Reduce no-op logs that spit out for each datasource even when no pending
segments have been deleted. This can get particularly noisy at low values
of `indexingPeriod`.
- Refactor the code in `KillStalePendingSegments` for readability and add
javadocs
---
docs/operations/metrics.md | 1 +
.../apache/druid/java/util/common/DateTimes.java | 31 +++
.../druid/java/util/common/DateTimesTest.java | 30 +++
.../coordinator/CoordinatorDynamicConfig.java | 15 +-
.../coordinator/KillStalePendingSegments.java | 106 ---------
.../coordinator/duty/KillStalePendingSegments.java | 139 +++++++++++
.../druid/server/coordinator/stats/Stats.java | 2 +
.../duty/KillStalePendingSegmentsTest.java | 264 +++++++++++++++++++++
.../java/org/apache/druid/cli/CliCoordinator.java | 2 +-
9 files changed, 475 insertions(+), 115 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 8913cd39a2..c2a77a4f3d 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -334,6 +334,7 @@ These metrics are for the Druid Coordinator and are reset
each time the Coordina
|`killTask/availableSlot/count`| Number of available task slots that can be
used for auto kill tasks in the auto kill run. This is the max number of task
slots minus any currently running auto kill tasks.
[...]
|`killTask/maxSlot/count`| Maximum number of task slots available for auto
kill tasks in the auto kill run.
[...]
|`kill/task/count`| Number of tasks issued in the auto kill run.
[...]
+|`kill/pendingSegments/count`|Number of stale pending segments deleted from
the metadata store.|`dataSource`|Varies|
|`segment/waitCompact/bytes`|Total bytes of this datasource waiting to be
compacted by the auto compaction (only consider intervals/segments that are
eligible for auto compaction).|`dataSource`|Varies|
|`segment/waitCompact/count`|Total number of segments of this datasource
waiting to be compacted by the auto compaction (only consider
intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|
|`interval/waitCompact/count`|Total number of intervals of this datasource
waiting to be compacted by the auto compaction (only consider
intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|
diff --git
a/processing/src/main/java/org/apache/druid/java/util/common/DateTimes.java
b/processing/src/main/java/org/apache/druid/java/util/common/DateTimes.java
index b9c8daff0d..2faebaec33 100644
--- a/processing/src/main/java/org/apache/druid/java/util/common/DateTimes.java
+++ b/processing/src/main/java/org/apache/druid/java/util/common/DateTimes.java
@@ -20,7 +20,9 @@
package org.apache.druid.java.util.common;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Ordering;
import io.netty.util.SuppressForbidden;
+import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.Chronology;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -30,6 +32,7 @@ import org.joda.time.chrono.ISOChronology;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
+import java.util.Objects;
import java.util.Set;
import java.util.TimeZone;
import java.util.regex.Pattern;
@@ -194,6 +197,34 @@ public final class DateTimes
&& ISOChronology.getInstanceUTC().equals(dateTime.getChronology());
}
+ /**
+ * Returns the earlier of the two given dates. When passed a null and a
non-null
+ * date, this method simply returns the non-null value.
+ */
+ public static DateTime earlierOf(DateTime a, DateTime b)
+ {
+ // Put nulls last to select the smaller non-null value
+ if (Objects.compare(a, b, Ordering.natural().nullsLast()) < 0) {
+ return a;
+ } else {
+ return b;
+ }
+ }
+
+ /**
+ * Returns the later of the two given dates. When passed a null and a
non-null
+ * date, this method simply returns the non-null value.
+ */
+ public static DateTime laterOf(DateTime a, DateTime b)
+ {
+ // Put nulls first to select the bigger non-null value
+ if (Objects.compare(a, b, Comparators.naturalNullsFirst()) > 0) {
+ return a;
+ } else {
+ return b;
+ }
+ }
+
private DateTimes()
{
}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java
b/processing/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java
index 55cc0b52a9..867cf3facd 100644
---
a/processing/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java
@@ -123,4 +123,34 @@ public class DateTimesTest
DateTimes.of("2000").withZone(DateTimes.inferTzFromString("America/Los_Angeles")))
);
}
+
+ @Test
+ public void testEarlierOf()
+ {
+ Assert.assertNull(DateTimes.earlierOf(null, null));
+
+ final DateTime jan14 = DateTimes.of("2013-01-14");
+ Assert.assertEquals(jan14, DateTimes.earlierOf(null, jan14));
+ Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, null));
+ Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, jan14));
+
+ final DateTime jan15 = DateTimes.of("2013-01-15");
+ Assert.assertEquals(jan14, DateTimes.earlierOf(jan15, jan14));
+ Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, jan15));
+ }
+
+ @Test
+ public void testLaterOf()
+ {
+ Assert.assertNull(DateTimes.laterOf(null, null));
+
+ final DateTime jan14 = DateTimes.of("2013-01-14");
+ Assert.assertEquals(jan14, DateTimes.laterOf(null, jan14));
+ Assert.assertEquals(jan14, DateTimes.laterOf(jan14, null));
+ Assert.assertEquals(jan14, DateTimes.laterOf(jan14, jan14));
+
+ final DateTime jan15 = DateTimes.of("2013-01-15");
+ Assert.assertEquals(jan15, DateTimes.laterOf(jan15, jan14));
+ Assert.assertEquals(jan15, DateTimes.laterOf(jan14, jan15));
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index 0b39688dbf..4f87c311f9 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.utils.JvmUtils;
@@ -77,19 +76,13 @@ public class CoordinatorDynamicConfig
private final Map<Dimension, String> validDebugDimensions;
/**
- * Stale pending segments belonging to the data sources in this list are not
killed by {@link
+ * Stale pending segments belonging to the data sources in this list are not
killed by {@code
* KillStalePendingSegments}. In other words, segments in these data sources
are "protected".
- * <p>
- * Pending segments are considered "stale" when their created_time is older
than {@link
- * KillStalePendingSegments#KEEP_PENDING_SEGMENTS_OFFSET} from now.
*/
private final Set<String> dataSourcesToNotKillStalePendingSegmentsIn;
/**
* The maximum number of segments that can be queued for loading to any
given server.
- *
- * @see LoadQueuePeon
- * @see org.apache.druid.server.coordinator.rules.LoadRule#run
*/
private final int maxSegmentsInNodeLoadingQueue;
private final boolean pauseCoordination;
@@ -576,6 +569,12 @@ public class CoordinatorDynamicConfig
return this;
}
+ public Builder withDatasourcesToNotKillPendingSegmentsIn(Set<String>
datasources)
+ {
+ this.dataSourcesToNotKillStalePendingSegmentsIn = datasources;
+ return this;
+ }
+
public Builder withKillTaskSlotRatio(Double killTaskSlotRatio)
{
this.killTaskSlotRatio = killTaskSlotRatio;
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java
deleted file mode 100644
index c2cd2c1fe6..0000000000
---
a/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordinator;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Inject;
-import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.indexer.TaskStatusPlus;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.guava.Comparators;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.rpc.indexing.OverlordClient;
-import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-import org.joda.time.Period;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class KillStalePendingSegments implements CoordinatorDuty
-{
- private static final Logger log = new Logger(KillStalePendingSegments.class);
- private static final Period KEEP_PENDING_SEGMENTS_OFFSET = new Period("P1D");
-
- private final OverlordClient overlordClient;
-
- @Inject
- public KillStalePendingSegments(OverlordClient overlordClient)
- {
- this.overlordClient = overlordClient;
- }
-
- @Override
- public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
- {
- final List<DateTime> createdTimes = new ArrayList<>();
-
- // Include one complete status so we can get the time of the last-created
complete task. (The Overlord API returns
- // complete tasks in descending order of created_date.)
- final List<TaskStatusPlus> statuses =
-
ImmutableList.copyOf(FutureUtils.getUnchecked(overlordClient.taskStatuses(null,
null, 1), true));
- createdTimes.add(
- statuses
- .stream()
- .filter(status -> status.getStatusCode() == null ||
!status.getStatusCode().isComplete())
- .map(TaskStatusPlus::getCreatedTime)
- .min(Comparators.naturalNullsFirst())
- .orElse(DateTimes.nowUtc()) // If there are no active tasks, this
returns the current time.
- );
-
- final TaskStatusPlus completeTaskStatus =
- statuses.stream()
- .filter(status -> status != null &&
status.getStatusCode().isComplete())
- .findFirst()
- .orElse(null);
- if (completeTaskStatus != null) {
- createdTimes.add(completeTaskStatus.getCreatedTime());
- }
- createdTimes.sort(Comparators.naturalNullsFirst());
-
- // There should be at least one createdTime because the current time is
added to the 'createdTimes' list if there
- // is no running/pending/waiting tasks.
- Preconditions.checkState(!createdTimes.isEmpty(), "Failed to gather
createdTimes of tasks");
-
- // If there is no running/pending/waiting/complete tasks,
stalePendingSegmentsCutoffCreationTime is
- // (DateTimes.nowUtc() - KEEP_PENDING_SEGMENTS_OFFSET).
- final DateTime stalePendingSegmentsCutoffCreationTime =
createdTimes.get(0).minus(KEEP_PENDING_SEGMENTS_OFFSET);
- for (String dataSource :
params.getUsedSegmentsTimelinesPerDataSource().keySet()) {
- if
(!params.getCoordinatorDynamicConfig().getDataSourcesToNotKillStalePendingSegmentsIn().contains(dataSource))
{
- final int pendingSegmentsKilled = FutureUtils.getUnchecked(
- overlordClient.killPendingSegments(
- dataSource,
- new Interval(DateTimes.MIN,
stalePendingSegmentsCutoffCreationTime)
- ),
- true
- );
- log.info(
- "Killed [%d] pendingSegments created until [%s] for
dataSource[%s]",
- pendingSegmentsKilled,
- stalePendingSegmentsCutoffCreationTime,
- dataSource
- );
- }
- }
- return params;
- }
-}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegments.java
new file mode 100644
index 0000000000..da730e20c0
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegments.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Inject;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Duty to kill stale pending segments which are not needed anymore. Pending
segments
+ * are created when appending realtime or batch tasks allocate segments to
build
+ * incremental indexes. Under normal operation, these pending segments get
committed
+ * when the task completes and become regular segments. But in case of task
failures,
+ * some pending segments might be left around and cause clutter in the
metadata store.
+ * <p>
+ * While cleaning up, this duty ensures that the following pending segments are
+ * retained for at least {@link #DURATION_TO_RETAIN}:
+ * <ul>
+ * <li>Pending segments created by any active task (across all
datasources)</li>
+ * <li>Pending segments created by the latest completed task (across all
datasources)</li>
+ * </ul>
+ */
+public class KillStalePendingSegments implements CoordinatorDuty
+{
+ private static final Logger log = new Logger(KillStalePendingSegments.class);
+ private static final Period DURATION_TO_RETAIN = new Period("P1D");
+
+ private final OverlordClient overlordClient;
+
+ @Inject
+ public KillStalePendingSegments(OverlordClient overlordClient)
+ {
+ this.overlordClient = overlordClient;
+ }
+
+ @Override
+ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
+ {
+ final Set<String> killDatasources = new HashSet<>(
+ params.getUsedSegmentsTimelinesPerDataSource().keySet()
+ );
+ killDatasources.removeAll(
+ params.getCoordinatorDynamicConfig()
+ .getDataSourcesToNotKillStalePendingSegmentsIn()
+ );
+
+ final DateTime minCreatedTime = getMinCreatedTimeToRetain();
+ for (String dataSource : killDatasources) {
+ int pendingSegmentsKilled = FutureUtils.getUnchecked(
+ overlordClient.killPendingSegments(
+ dataSource,
+ new Interval(DateTimes.MIN, minCreatedTime)
+ ),
+ true
+ );
+ if (pendingSegmentsKilled > 0) {
+ log.info(
+ "Killed [%d] pendingSegments created before [%s] for
datasource[%s].",
+ pendingSegmentsKilled, minCreatedTime, dataSource
+ );
+ params.getCoordinatorStats().add(
+ Stats.Kill.PENDING_SEGMENTS,
+ RowKey.of(Dimension.DATASOURCE, dataSource),
+ pendingSegmentsKilled
+ );
+ }
+ }
+ return params;
+ }
+
+ /**
+ * Computes the minimum created time of retainable pending segments. Any
pending
+ * segment created before this time is considered stale and can be safely
deleted.
+ * The limit is determined to ensure that pending segments created by any
active
+ * task and the latest completed task (across all datasources) are retained
for
+ * at least {@link #DURATION_TO_RETAIN}.
+ */
+ private DateTime getMinCreatedTimeToRetain()
+ {
+ // Fetch the statuses of all active tasks and the latest completed task
+ // (The Overlord API returns complete tasks in descending order of
created_date.)
+ final List<TaskStatusPlus> statuses = ImmutableList.copyOf(
+ FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 1),
true)
+ );
+
+ DateTime earliestActiveTaskStart = DateTimes.nowUtc();
+ DateTime latestCompletedTaskStart = null;
+ for (TaskStatusPlus status : statuses) {
+ if (status.getStatusCode() == null) {
+ // Unknown status
+ } else if (status.getStatusCode().isComplete()) {
+ latestCompletedTaskStart = DateTimes.laterOf(
+ latestCompletedTaskStart,
+ status.getCreatedTime()
+ );
+ } else {
+ earliestActiveTaskStart = DateTimes.earlierOf(
+ earliestActiveTaskStart,
+ status.getCreatedTime()
+ );
+ }
+ }
+
+ return DateTimes.earlierOf(latestCompletedTaskStart,
earliestActiveTaskStart)
+ .minus(DURATION_TO_RETAIN);
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
index 84f2d471b0..539d58d559 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
@@ -147,6 +147,8 @@ public class Stats
= CoordinatorStat.toDebugAndEmit("killMaxSlots",
"killTask/maxSlot/count");
public static final CoordinatorStat SUBMITTED_TASKS
= CoordinatorStat.toDebugAndEmit("killTasks", "kill/task/count");
+ public static final CoordinatorStat PENDING_SEGMENTS
+ = CoordinatorStat.toDebugAndEmit("killPendingSegs",
"kill/pendingSegments/count");
}
public static class Balancer
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegmentsTest.java
new file mode 100644
index 0000000000..11ea5bd4b5
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegmentsTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.client.indexing.NoopOverlordClient;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class KillStalePendingSegmentsTest
+{
+ private TestOverlordClient overlordClient;
+ private KillStalePendingSegments killDuty;
+
+ @Before
+ public void setup()
+ {
+ this.overlordClient = new TestOverlordClient();
+ this.killDuty = new KillStalePendingSegments(overlordClient);
+ }
+
+ @Test
+ public void testRetentionStarts1DayBeforeNowWhenNoKnownTask()
+ {
+ DruidCoordinatorRuntimeParams params =
createParamsWithDatasources(DS.WIKI).build();
+ killDuty.run(params);
+
+ final Interval observedKillInterval =
overlordClient.observedKillIntervals.get(DS.WIKI);
+ Assert.assertEquals(DateTimes.MIN, observedKillInterval.getStart());
+
+ // Verify that the cutoff time is no later than 1 day ago from now
+ DateTime expectedCutoffTime = DateTimes.nowUtc().minusDays(1);
+ Assert.assertTrue(
+ expectedCutoffTime.getMillis() -
observedKillInterval.getEnd().getMillis() <= 100
+ );
+ }
+
+ @Test
+ public void testRetentionStarts1DayBeforeEarliestActiveTask()
+ {
+ final DateTime startOfEarliestActiveTask = DateTimes.of("2023-01-01");
+ overlordClient.addTaskAndSegment(DS.WIKI, startOfEarliestActiveTask,
TaskState.RUNNING);
+ overlordClient.addTaskAndSegment(DS.WIKI,
startOfEarliestActiveTask.plusHours(2), TaskState.RUNNING);
+ overlordClient.addTaskAndSegment(DS.WIKI,
startOfEarliestActiveTask.plusDays(1), TaskState.RUNNING);
+ overlordClient.addTaskAndSegment(DS.WIKI,
startOfEarliestActiveTask.plusHours(3), TaskState.RUNNING);
+
+ DruidCoordinatorRuntimeParams params =
createParamsWithDatasources(DS.WIKI).build();
+ killDuty.run(params);
+
+ final Interval observedKillInterval =
overlordClient.observedKillIntervals.get(DS.WIKI);
+ Assert.assertEquals(DateTimes.MIN, observedKillInterval.getStart());
+ Assert.assertEquals(startOfEarliestActiveTask.minusDays(1),
observedKillInterval.getEnd());
+ }
+
+ @Test
+ public void testRetentionStarts1DayBeforeLatestCompletedTask()
+ {
+ final DateTime startOfLatestCompletedTask = DateTimes.of("2023-01-01");
+ overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask,
TaskState.FAILED);
+ overlordClient.addTaskAndSegment(DS.WIKI,
startOfLatestCompletedTask.minusHours(2), TaskState.SUCCESS);
+ overlordClient.addTaskAndSegment(DS.WIKI,
startOfLatestCompletedTask.minusDays(2), TaskState.FAILED);
+ overlordClient.addTaskAndSegment(DS.WIKI,
startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS);
+
+ DruidCoordinatorRuntimeParams params =
createParamsWithDatasources(DS.WIKI).build();
+ killDuty.run(params);
+
+ final Interval observedKillInterval =
overlordClient.observedKillIntervals.get(DS.WIKI);
+ Assert.assertEquals(DateTimes.MIN, observedKillInterval.getStart());
+ Assert.assertEquals(startOfLatestCompletedTask.minusDays(1),
observedKillInterval.getEnd());
+
+ final CoordinatorRunStats stats = params.getCoordinatorStats();
+ Assert.assertEquals(2, stats.get(Stats.Kill.PENDING_SEGMENTS,
RowKey.of(Dimension.DATASOURCE, DS.WIKI)));
+ }
+
+ @Test
+ public void
testRetentionStarts1DayBeforeLatestCompletedOrEarliestActiveTask()
+ {
+ final DateTime startOfLatestCompletedTask = DateTimes.of("2023-02-01");
+ overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask,
TaskState.FAILED);
+
+ final DateTime startOfEarliestActiveTask = DateTimes.of("2023-01-01");
+ overlordClient.addTaskAndSegment(DS.KOALA, startOfEarliestActiveTask,
TaskState.RUNNING);
+
+ DruidCoordinatorRuntimeParams params =
createParamsWithDatasources(DS.WIKI, DS.KOALA).build();
+ killDuty.run(params);
+
+ DateTime earliestEligibleTask =
DateTimes.earlierOf(startOfEarliestActiveTask, startOfLatestCompletedTask);
+ final Interval wikiKillInterval =
overlordClient.observedKillIntervals.get(DS.WIKI);
+ Assert.assertEquals(DateTimes.MIN, wikiKillInterval.getStart());
+ Assert.assertEquals(earliestEligibleTask.minusDays(1),
wikiKillInterval.getEnd());
+
+ final Interval koalaKillInterval =
overlordClient.observedKillIntervals.get(DS.KOALA);
+ Assert.assertEquals(DateTimes.MIN, koalaKillInterval.getStart());
+ Assert.assertEquals(earliestEligibleTask.minusDays(1),
wikiKillInterval.getEnd());
+ }
+
+ @Test
+ public void testPendingSegmentOfDisallowedDatasourceIsNotDeleted()
+ {
+ DruidCoordinatorRuntimeParams params =
+ createParamsWithDatasources(DS.WIKI, DS.KOALA).withDynamicConfigs(
+ CoordinatorDynamicConfig
+ .builder()
+ .withDatasourcesToNotKillPendingSegmentsIn(
+ Collections.singleton(DS.KOALA)
+ )
+ .build()
+ ).build();
+
+ DateTime startOfLatestCompletedTask = DateTimes.of("2023-01-01");
+ overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask,
TaskState.SUCCESS);
+ overlordClient.addTaskAndSegment(DS.WIKI,
startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS);
+ overlordClient.addTaskAndSegment(DS.WIKI,
startOfLatestCompletedTask.minusDays(5), TaskState.SUCCESS);
+ overlordClient.addTaskAndSegment(DS.KOALA, startOfLatestCompletedTask,
TaskState.SUCCESS);
+ overlordClient.addTaskAndSegment(DS.KOALA,
startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS);
+ overlordClient.addTaskAndSegment(DS.KOALA,
startOfLatestCompletedTask.minusDays(5), TaskState.SUCCESS);
+
+ killDuty.run(params);
+
+ // Verify that stale pending segments are killed in "wiki" but not in
"koala"
+ final CoordinatorRunStats stats = params.getCoordinatorStats();
+
Assert.assertTrue(overlordClient.observedKillIntervals.containsKey(DS.WIKI));
+ Assert.assertEquals(2, stats.get(Stats.Kill.PENDING_SEGMENTS,
RowKey.of(Dimension.DATASOURCE, DS.WIKI)));
+
+
Assert.assertFalse(overlordClient.observedKillIntervals.containsKey(DS.KOALA));
+ Assert.assertEquals(0, stats.get(Stats.Kill.PENDING_SEGMENTS,
RowKey.of(Dimension.DATASOURCE, DS.KOALA)));
+ }
+
+ private DruidCoordinatorRuntimeParams.Builder
createParamsWithDatasources(String... datasources)
+ {
+ DruidCoordinatorRuntimeParams.Builder builder =
DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc());
+
+ // Create a dummy for each of the datasources so that they get added to
the timeline
+ Set<DataSegment> usedSegments = new HashSet<>();
+ for (String datasource : datasources) {
+ usedSegments.add(
+
DataSegment.builder().dataSource(datasource).interval(Intervals.ETERNITY)
+ .version("v1").shardSpec(new NumberedShardSpec(0,
1)).size(100).build()
+ );
+ }
+
+ return builder.withUsedSegments(usedSegments);
+ }
+
+ private static class DS
+ {
+ static final String WIKI = "wiki";
+ static final String KOALA = "koala";
+ }
+
+ /**
+ * Simulates an Overlord with a configurable list of tasks and pending
segments.
+ */
+ private static class TestOverlordClient extends NoopOverlordClient
+ {
+ private final List<TaskStatusPlus> taskStatuses = new ArrayList<>();
+ private final Map<String, List<DateTime>> datasourceToPendingSegments =
new HashMap<>();
+
+ private final Map<String, Interval> observedKillIntervals = new
HashMap<>();
+
+ private int taskIdSuffix = 0;
+
+ void addTaskAndSegment(String datasource, DateTime createdTime, TaskState
state)
+ {
+ taskStatuses.add(
+ new TaskStatusPlus(
+ datasource + "__" + taskIdSuffix++,
+ null, null, createdTime, createdTime, state,
+ state.isComplete() ? RunnerTaskState.NONE :
RunnerTaskState.RUNNING,
+ 100L, TaskLocation.unknown(), datasource, null
+ )
+ );
+
+ // Add a pending segment with created time 5 minutes after the task was
created
+ datasourceToPendingSegments.computeIfAbsent(datasource, ds -> new
ArrayList<>())
+ .add(createdTime.plusMinutes(5));
+ }
+
+ @Override
+ public ListenableFuture<CloseableIterator<TaskStatusPlus>> taskStatuses(
+ @Nullable String state,
+ @Nullable String dataSource,
+ @Nullable Integer maxCompletedTasks
+ )
+ {
+ return Futures.immediateFuture(
+ CloseableIterators.wrap(taskStatuses.iterator(), null)
+ );
+ }
+
+ @Override
+ public ListenableFuture<Integer> killPendingSegments(String dataSource,
Interval interval)
+ {
+ observedKillIntervals.put(dataSource, interval);
+
+ List<DateTime> pendingSegments =
datasourceToPendingSegments.remove(dataSource);
+ if (pendingSegments == null || pendingSegments.isEmpty()) {
+ return Futures.immediateFuture(0);
+ }
+
+ List<DateTime> remainingPendingSegments = new ArrayList<>();
+ int numDeletedPendingSegments = 0;
+ for (DateTime createdTime : pendingSegments) {
+ if (createdTime.isBefore(interval.getEnd())) {
+ ++numDeletedPendingSegments;
+ } else {
+ remainingPendingSegments.add(createdTime);
+ }
+ }
+
+ if (remainingPendingSegments.size() > 0) {
+ datasourceToPendingSegments.put(dataSource, remainingPendingSegments);
+ }
+
+ return Futures.immediateFuture(numDeletedPendingSegments);
+ }
+ }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 5327f0b3a7..a7883ceeb5 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -78,7 +78,6 @@ import
org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.server.audit.AuditManagerProvider;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
-import org.apache.druid.server.coordinator.KillStalePendingSegments;
import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
import
org.apache.druid.server.coordinator.balancer.CachingCostBalancerStrategyConfig;
import
org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
@@ -91,6 +90,7 @@ import org.apache.druid.server.coordinator.duty.KillAuditLog;
import org.apache.druid.server.coordinator.duty.KillCompactionConfig;
import org.apache.druid.server.coordinator.duty.KillDatasourceMetadata;
import org.apache.druid.server.coordinator.duty.KillRules;
+import org.apache.druid.server.coordinator.duty.KillStalePendingSegments;
import org.apache.druid.server.coordinator.duty.KillSupervisors;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]