This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 647686aee2 Add test and metrics for KillStalePendingSegments duty 
(#14951)
647686aee2 is described below

commit 647686aee2a0cf29cc50a13c53d97b61ba479e0c
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Sep 8 10:33:47 2023 +0530

    Add test and metrics for KillStalePendingSegments duty (#14951)
    
    Changes:
    - Add new metric `kill/pendingSegments/count` with dimension `dataSource`
    - Add tests for `KillStalePendingSegments`
    - Reduce no-op logs that spit out for each datasource even when no pending
    segments have been deleted. This can get particularly noisy at low values 
of `indexingPeriod`.
    - Refactor the code in `KillStalePendingSegments` for readability and add 
javadocs
---
 docs/operations/metrics.md                         |   1 +
 .../apache/druid/java/util/common/DateTimes.java   |  31 +++
 .../druid/java/util/common/DateTimesTest.java      |  30 +++
 .../coordinator/CoordinatorDynamicConfig.java      |  15 +-
 .../coordinator/KillStalePendingSegments.java      | 106 ---------
 .../coordinator/duty/KillStalePendingSegments.java | 139 +++++++++++
 .../druid/server/coordinator/stats/Stats.java      |   2 +
 .../duty/KillStalePendingSegmentsTest.java         | 264 +++++++++++++++++++++
 .../java/org/apache/druid/cli/CliCoordinator.java  |   2 +-
 9 files changed, 475 insertions(+), 115 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 8913cd39a2..c2a77a4f3d 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -334,6 +334,7 @@ These metrics are for the Druid Coordinator and are reset 
each time the Coordina
 |`killTask/availableSlot/count`| Number of available task slots that can be 
used for auto kill tasks in the auto kill run. This is the max number of task 
slots minus any currently running auto kill tasks.                              
                                                                                
                                                                                
                                                                                
                   [...]
 |`killTask/maxSlot/count`| Maximum number of task slots available for auto 
kill tasks in the auto kill run.                                                
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
 |`kill/task/count`| Number of tasks issued in the auto kill run.               
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|`kill/pendingSegments/count`|Number of stale pending segments deleted from 
the metadata store.|`dataSource`|Varies|
 |`segment/waitCompact/bytes`|Total bytes of this datasource waiting to be 
compacted by the auto compaction (only consider intervals/segments that are 
eligible for auto compaction).|`dataSource`|Varies|
 |`segment/waitCompact/count`|Total number of segments of this datasource 
waiting to be compacted by the auto compaction (only consider 
intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|
 |`interval/waitCompact/count`|Total number of intervals of this datasource 
waiting to be compacted by the auto compaction (only consider 
intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|
diff --git 
a/processing/src/main/java/org/apache/druid/java/util/common/DateTimes.java 
b/processing/src/main/java/org/apache/druid/java/util/common/DateTimes.java
index b9c8daff0d..2faebaec33 100644
--- a/processing/src/main/java/org/apache/druid/java/util/common/DateTimes.java
+++ b/processing/src/main/java/org/apache/druid/java/util/common/DateTimes.java
@@ -20,7 +20,9 @@
 package org.apache.druid.java.util.common;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Ordering;
 import io.netty.util.SuppressForbidden;
+import org.apache.druid.java.util.common.guava.Comparators;
 import org.joda.time.Chronology;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -30,6 +32,7 @@ import org.joda.time.chrono.ISOChronology;
 import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.ISODateTimeFormat;
 
+import java.util.Objects;
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.regex.Pattern;
@@ -194,6 +197,34 @@ public final class DateTimes
            && ISOChronology.getInstanceUTC().equals(dateTime.getChronology());
   }
 
+  /**
+   * Returns the earlier of the two given dates. When passed a null and a 
non-null
+   * date, this method simply returns the non-null value.
+   */
+  public static DateTime earlierOf(DateTime a, DateTime b)
+  {
+    // Put nulls last to select the smaller non-null value
+    if (Objects.compare(a, b, Ordering.natural().nullsLast()) < 0) {
+      return a;
+    } else {
+      return b;
+    }
+  }
+
+  /**
+   * Returns the later of the two given dates. When passed a null and a 
non-null
+   * date, this method simply returns the non-null value.
+   */
+  public static DateTime laterOf(DateTime a, DateTime b)
+  {
+    // Put nulls first to select the bigger non-null value
+    if (Objects.compare(a, b, Comparators.naturalNullsFirst()) > 0) {
+      return a;
+    } else {
+      return b;
+    }
+  }
+
   private DateTimes()
   {
   }
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java 
b/processing/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java
index 55cc0b52a9..867cf3facd 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java
@@ -123,4 +123,34 @@ public class DateTimesTest
         
DateTimes.of("2000").withZone(DateTimes.inferTzFromString("America/Los_Angeles")))
     );
   }
+
+  @Test
+  public void testEarlierOf()
+  {
+    Assert.assertNull(DateTimes.earlierOf(null, null));
+
+    final DateTime jan14 = DateTimes.of("2013-01-14");
+    Assert.assertEquals(jan14, DateTimes.earlierOf(null, jan14));
+    Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, null));
+    Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, jan14));
+
+    final DateTime jan15 = DateTimes.of("2013-01-15");
+    Assert.assertEquals(jan14, DateTimes.earlierOf(jan15, jan14));
+    Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, jan15));
+  }
+
+  @Test
+  public void testLaterOf()
+  {
+    Assert.assertNull(DateTimes.laterOf(null, null));
+
+    final DateTime jan14 = DateTimes.of("2013-01-14");
+    Assert.assertEquals(jan14, DateTimes.laterOf(null, jan14));
+    Assert.assertEquals(jan14, DateTimes.laterOf(jan14, null));
+    Assert.assertEquals(jan14, DateTimes.laterOf(jan14, jan14));
+
+    final DateTime jan15 = DateTimes.of("2013-01-15");
+    Assert.assertEquals(jan15, DateTimes.laterOf(jan15, jan14));
+    Assert.assertEquals(jan15, DateTimes.laterOf(jan14, jan15));
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index 0b39688dbf..4f87c311f9 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.druid.common.config.JacksonConfigManager;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
-import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
 import org.apache.druid.server.coordinator.stats.Dimension;
 import org.apache.druid.utils.JvmUtils;
 
@@ -77,19 +76,13 @@ public class CoordinatorDynamicConfig
   private final Map<Dimension, String> validDebugDimensions;
 
   /**
-   * Stale pending segments belonging to the data sources in this list are not 
killed by {@link
+   * Stale pending segments belonging to the data sources in this list are not 
killed by {@code
    * KillStalePendingSegments}. In other words, segments in these data sources 
are "protected".
-   * <p>
-   * Pending segments are considered "stale" when their created_time is older 
than {@link
-   * KillStalePendingSegments#KEEP_PENDING_SEGMENTS_OFFSET} from now.
    */
   private final Set<String> dataSourcesToNotKillStalePendingSegmentsIn;
 
   /**
    * The maximum number of segments that can be queued for loading to any 
given server.
-   *
-   * @see LoadQueuePeon
-   * @see org.apache.druid.server.coordinator.rules.LoadRule#run
    */
   private final int maxSegmentsInNodeLoadingQueue;
   private final boolean pauseCoordination;
@@ -576,6 +569,12 @@ public class CoordinatorDynamicConfig
       return this;
     }
 
+    public Builder withDatasourcesToNotKillPendingSegmentsIn(Set<String> 
datasources)
+    {
+      this.dataSourcesToNotKillStalePendingSegmentsIn = datasources;
+      return this;
+    }
+
     public Builder withKillTaskSlotRatio(Double killTaskSlotRatio)
     {
       this.killTaskSlotRatio = killTaskSlotRatio;
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java
deleted file mode 100644
index c2cd2c1fe6..0000000000
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordinator;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Inject;
-import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.indexer.TaskStatusPlus;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.guava.Comparators;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.rpc.indexing.OverlordClient;
-import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-import org.joda.time.Period;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class KillStalePendingSegments implements CoordinatorDuty
-{
-  private static final Logger log = new Logger(KillStalePendingSegments.class);
-  private static final Period KEEP_PENDING_SEGMENTS_OFFSET = new Period("P1D");
-
-  private final OverlordClient overlordClient;
-
-  @Inject
-  public KillStalePendingSegments(OverlordClient overlordClient)
-  {
-    this.overlordClient = overlordClient;
-  }
-
-  @Override
-  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
-  {
-    final List<DateTime> createdTimes = new ArrayList<>();
-
-    // Include one complete status so we can get the time of the last-created 
complete task. (The Overlord API returns
-    // complete tasks in descending order of created_date.)
-    final List<TaskStatusPlus> statuses =
-        
ImmutableList.copyOf(FutureUtils.getUnchecked(overlordClient.taskStatuses(null, 
null, 1), true));
-    createdTimes.add(
-        statuses
-            .stream()
-            .filter(status -> status.getStatusCode() == null || 
!status.getStatusCode().isComplete())
-            .map(TaskStatusPlus::getCreatedTime)
-            .min(Comparators.naturalNullsFirst())
-            .orElse(DateTimes.nowUtc()) // If there are no active tasks, this 
returns the current time.
-    );
-
-    final TaskStatusPlus completeTaskStatus =
-        statuses.stream()
-                .filter(status -> status != null && 
status.getStatusCode().isComplete())
-                .findFirst()
-                .orElse(null);
-    if (completeTaskStatus != null) {
-      createdTimes.add(completeTaskStatus.getCreatedTime());
-    }
-    createdTimes.sort(Comparators.naturalNullsFirst());
-
-    // There should be at least one createdTime because the current time is 
added to the 'createdTimes' list if there
-    // is no running/pending/waiting tasks.
-    Preconditions.checkState(!createdTimes.isEmpty(), "Failed to gather 
createdTimes of tasks");
-
-    // If there is no running/pending/waiting/complete tasks, 
stalePendingSegmentsCutoffCreationTime is
-    // (DateTimes.nowUtc() - KEEP_PENDING_SEGMENTS_OFFSET).
-    final DateTime stalePendingSegmentsCutoffCreationTime = 
createdTimes.get(0).minus(KEEP_PENDING_SEGMENTS_OFFSET);
-    for (String dataSource : 
params.getUsedSegmentsTimelinesPerDataSource().keySet()) {
-      if 
(!params.getCoordinatorDynamicConfig().getDataSourcesToNotKillStalePendingSegmentsIn().contains(dataSource))
 {
-        final int pendingSegmentsKilled = FutureUtils.getUnchecked(
-            overlordClient.killPendingSegments(
-                dataSource,
-                new Interval(DateTimes.MIN, 
stalePendingSegmentsCutoffCreationTime)
-            ),
-            true
-        );
-        log.info(
-            "Killed [%d] pendingSegments created until [%s] for 
dataSource[%s]",
-            pendingSegmentsKilled,
-            stalePendingSegmentsCutoffCreationTime,
-            dataSource
-        );
-      }
-    }
-    return params;
-  }
-}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegments.java
new file mode 100644
index 0000000000..da730e20c0
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegments.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Inject;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Duty to kill stale pending segments which are not needed anymore. Pending 
segments
+ * are created when appending realtime or batch tasks allocate segments to 
build
+ * incremental indexes. Under normal operation, these pending segments get 
committed
+ * when the task completes and become regular segments. But in case of task 
failures,
+ * some pending segments might be left around and cause clutter in the 
metadata store.
+ * <p>
+ * While cleaning up, this duty ensures that the following pending segments are
+ * retained for at least {@link #DURATION_TO_RETAIN}:
+ * <ul>
+ * <li>Pending segments created by any active task (across all 
datasources)</li>
+ * <li>Pending segments created by the latest completed task (across all 
datasources)</li>
+ * </ul>
+ */
+public class KillStalePendingSegments implements CoordinatorDuty
+{
+  private static final Logger log = new Logger(KillStalePendingSegments.class);
+  private static final Period DURATION_TO_RETAIN = new Period("P1D");
+
+  private final OverlordClient overlordClient;
+
+  @Inject
+  public KillStalePendingSegments(OverlordClient overlordClient)
+  {
+    this.overlordClient = overlordClient;
+  }
+
+  @Override
+  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
+  {
+    final Set<String> killDatasources = new HashSet<>(
+        params.getUsedSegmentsTimelinesPerDataSource().keySet()
+    );
+    killDatasources.removeAll(
+        params.getCoordinatorDynamicConfig()
+              .getDataSourcesToNotKillStalePendingSegmentsIn()
+    );
+
+    final DateTime minCreatedTime = getMinCreatedTimeToRetain();
+    for (String dataSource : killDatasources) {
+      int pendingSegmentsKilled = FutureUtils.getUnchecked(
+          overlordClient.killPendingSegments(
+              dataSource,
+              new Interval(DateTimes.MIN, minCreatedTime)
+          ),
+          true
+      );
+      if (pendingSegmentsKilled > 0) {
+        log.info(
+            "Killed [%d] pendingSegments created before [%s] for 
datasource[%s].",
+            pendingSegmentsKilled, minCreatedTime, dataSource
+        );
+        params.getCoordinatorStats().add(
+            Stats.Kill.PENDING_SEGMENTS,
+            RowKey.of(Dimension.DATASOURCE, dataSource),
+            pendingSegmentsKilled
+        );
+      }
+    }
+    return params;
+  }
+
+  /**
+   * Computes the minimum created time of retainable pending segments. Any 
pending
+   * segment created before this time is considered stale and can be safely 
deleted.
+   * The limit is determined to ensure that pending segments created by any 
active
+   * task and the latest completed task (across all datasources) are retained 
for
+   * at least {@link #DURATION_TO_RETAIN}.
+   */
+  private DateTime getMinCreatedTimeToRetain()
+  {
+    // Fetch the statuses of all active tasks and the latest completed task
+    // (The Overlord API returns complete tasks in descending order of 
created_date.)
+    final List<TaskStatusPlus> statuses = ImmutableList.copyOf(
+        FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 1), 
true)
+    );
+
+    DateTime earliestActiveTaskStart = DateTimes.nowUtc();
+    DateTime latestCompletedTaskStart = null;
+    for (TaskStatusPlus status : statuses) {
+      if (status.getStatusCode() == null) {
+        // Unknown status
+      } else if (status.getStatusCode().isComplete()) {
+        latestCompletedTaskStart = DateTimes.laterOf(
+            latestCompletedTaskStart,
+            status.getCreatedTime()
+        );
+      } else {
+        earliestActiveTaskStart = DateTimes.earlierOf(
+            earliestActiveTaskStart,
+            status.getCreatedTime()
+        );
+      }
+    }
+
+    return DateTimes.earlierOf(latestCompletedTaskStart, 
earliestActiveTaskStart)
+                    .minus(DURATION_TO_RETAIN);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
index 84f2d471b0..539d58d559 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
@@ -147,6 +147,8 @@ public class Stats
         = CoordinatorStat.toDebugAndEmit("killMaxSlots", 
"killTask/maxSlot/count");
     public static final CoordinatorStat SUBMITTED_TASKS
         = CoordinatorStat.toDebugAndEmit("killTasks", "kill/task/count");
+    public static final CoordinatorStat PENDING_SEGMENTS
+        = CoordinatorStat.toDebugAndEmit("killPendingSegs", 
"kill/pendingSegments/count");
   }
 
   public static class Balancer
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegmentsTest.java
new file mode 100644
index 0000000000..11ea5bd4b5
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegmentsTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.client.indexing.NoopOverlordClient;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class KillStalePendingSegmentsTest
+{
+  private TestOverlordClient overlordClient;
+  private KillStalePendingSegments killDuty;
+
+  @Before
+  public void setup()
+  {
+    this.overlordClient = new TestOverlordClient();
+    this.killDuty = new KillStalePendingSegments(overlordClient);
+  }
+
+  @Test
+  public void testRetentionStarts1DayBeforeNowWhenNoKnownTask()
+  {
+    DruidCoordinatorRuntimeParams params = 
createParamsWithDatasources(DS.WIKI).build();
+    killDuty.run(params);
+
+    final Interval observedKillInterval = 
overlordClient.observedKillIntervals.get(DS.WIKI);
+    Assert.assertEquals(DateTimes.MIN, observedKillInterval.getStart());
+
+    // Verify that the cutoff time is no later than 1 day ago from now
+    DateTime expectedCutoffTime = DateTimes.nowUtc().minusDays(1);
+    Assert.assertTrue(
+        expectedCutoffTime.getMillis() - 
observedKillInterval.getEnd().getMillis() <= 100
+    );
+  }
+
+  @Test
+  public void testRetentionStarts1DayBeforeEarliestActiveTask()
+  {
+    final DateTime startOfEarliestActiveTask = DateTimes.of("2023-01-01");
+    overlordClient.addTaskAndSegment(DS.WIKI, startOfEarliestActiveTask, 
TaskState.RUNNING);
+    overlordClient.addTaskAndSegment(DS.WIKI, 
startOfEarliestActiveTask.plusHours(2), TaskState.RUNNING);
+    overlordClient.addTaskAndSegment(DS.WIKI, 
startOfEarliestActiveTask.plusDays(1), TaskState.RUNNING);
+    overlordClient.addTaskAndSegment(DS.WIKI, 
startOfEarliestActiveTask.plusHours(3), TaskState.RUNNING);
+
+    DruidCoordinatorRuntimeParams params = 
createParamsWithDatasources(DS.WIKI).build();
+    killDuty.run(params);
+
+    final Interval observedKillInterval = 
overlordClient.observedKillIntervals.get(DS.WIKI);
+    Assert.assertEquals(DateTimes.MIN, observedKillInterval.getStart());
+    Assert.assertEquals(startOfEarliestActiveTask.minusDays(1), 
observedKillInterval.getEnd());
+  }
+
+  @Test
+  public void testRetentionStarts1DayBeforeLatestCompletedTask()
+  {
+    final DateTime startOfLatestCompletedTask = DateTimes.of("2023-01-01");
+    overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask, 
TaskState.FAILED);
+    overlordClient.addTaskAndSegment(DS.WIKI, 
startOfLatestCompletedTask.minusHours(2), TaskState.SUCCESS);
+    overlordClient.addTaskAndSegment(DS.WIKI, 
startOfLatestCompletedTask.minusDays(2), TaskState.FAILED);
+    overlordClient.addTaskAndSegment(DS.WIKI, 
startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS);
+
+    DruidCoordinatorRuntimeParams params = 
createParamsWithDatasources(DS.WIKI).build();
+    killDuty.run(params);
+
+    final Interval observedKillInterval = 
overlordClient.observedKillIntervals.get(DS.WIKI);
+    Assert.assertEquals(DateTimes.MIN, observedKillInterval.getStart());
+    Assert.assertEquals(startOfLatestCompletedTask.minusDays(1), 
observedKillInterval.getEnd());
+
+    final CoordinatorRunStats stats = params.getCoordinatorStats();
+    Assert.assertEquals(2, stats.get(Stats.Kill.PENDING_SEGMENTS, 
RowKey.of(Dimension.DATASOURCE, DS.WIKI)));
+  }
+
+  @Test
+  public void 
testRetentionStarts1DayBeforeLatestCompletedOrEarliestActiveTask()
+  {
+    final DateTime startOfLatestCompletedTask = DateTimes.of("2023-02-01");
+    overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask, 
TaskState.FAILED);
+
+    final DateTime startOfEarliestActiveTask = DateTimes.of("2023-01-01");
+    overlordClient.addTaskAndSegment(DS.KOALA, startOfEarliestActiveTask, 
TaskState.RUNNING);
+
+    DruidCoordinatorRuntimeParams params = 
createParamsWithDatasources(DS.WIKI, DS.KOALA).build();
+    killDuty.run(params);
+
+    DateTime earliestEligibleTask = 
DateTimes.earlierOf(startOfEarliestActiveTask, startOfLatestCompletedTask);
+    final Interval wikiKillInterval = 
overlordClient.observedKillIntervals.get(DS.WIKI);
+    Assert.assertEquals(DateTimes.MIN, wikiKillInterval.getStart());
+    Assert.assertEquals(earliestEligibleTask.minusDays(1), 
wikiKillInterval.getEnd());
+
+    final Interval koalaKillInterval = 
overlordClient.observedKillIntervals.get(DS.KOALA);
+    Assert.assertEquals(DateTimes.MIN, koalaKillInterval.getStart());
+    Assert.assertEquals(earliestEligibleTask.minusDays(1), 
wikiKillInterval.getEnd());
+  }
+
+  @Test
+  public void testPendingSegmentOfDisallowedDatasourceIsNotDeleted()
+  {
+    DruidCoordinatorRuntimeParams params =
+        createParamsWithDatasources(DS.WIKI, DS.KOALA).withDynamicConfigs(
+            CoordinatorDynamicConfig
+                .builder()
+                .withDatasourcesToNotKillPendingSegmentsIn(
+                    Collections.singleton(DS.KOALA)
+                )
+                .build()
+        ).build();
+
+    DateTime startOfLatestCompletedTask = DateTimes.of("2023-01-01");
+    overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask, 
TaskState.SUCCESS);
+    overlordClient.addTaskAndSegment(DS.WIKI, 
startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS);
+    overlordClient.addTaskAndSegment(DS.WIKI, 
startOfLatestCompletedTask.minusDays(5), TaskState.SUCCESS);
+    overlordClient.addTaskAndSegment(DS.KOALA, startOfLatestCompletedTask, 
TaskState.SUCCESS);
+    overlordClient.addTaskAndSegment(DS.KOALA, 
startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS);
+    overlordClient.addTaskAndSegment(DS.KOALA, 
startOfLatestCompletedTask.minusDays(5), TaskState.SUCCESS);
+
+    killDuty.run(params);
+
+    // Verify that stale pending segments are killed in "wiki" but not in 
"koala"
+    final CoordinatorRunStats stats = params.getCoordinatorStats();
+    
Assert.assertTrue(overlordClient.observedKillIntervals.containsKey(DS.WIKI));
+    Assert.assertEquals(2, stats.get(Stats.Kill.PENDING_SEGMENTS, 
RowKey.of(Dimension.DATASOURCE, DS.WIKI)));
+
+    
Assert.assertFalse(overlordClient.observedKillIntervals.containsKey(DS.KOALA));
+    Assert.assertEquals(0, stats.get(Stats.Kill.PENDING_SEGMENTS, 
RowKey.of(Dimension.DATASOURCE, DS.KOALA)));
+  }
+
+  private DruidCoordinatorRuntimeParams.Builder 
createParamsWithDatasources(String... datasources)
+  {
+    DruidCoordinatorRuntimeParams.Builder builder = 
DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc());
+
+    // Create a dummy for each of the datasources so that they get added to 
the timeline
+    Set<DataSegment> usedSegments = new HashSet<>();
+    for (String datasource : datasources) {
+      usedSegments.add(
+          
DataSegment.builder().dataSource(datasource).interval(Intervals.ETERNITY)
+                     .version("v1").shardSpec(new NumberedShardSpec(0, 
1)).size(100).build()
+      );
+    }
+
+    return builder.withUsedSegments(usedSegments);
+  }
+
+  private static class DS
+  {
+    static final String WIKI = "wiki";
+    static final String KOALA = "koala";
+  }
+
+  /**
+   * Simulates an Overlord with a configurable list of tasks and pending 
segments.
+   */
+  private static class TestOverlordClient extends NoopOverlordClient
+  {
+    private final List<TaskStatusPlus> taskStatuses = new ArrayList<>();
+    private final Map<String, List<DateTime>> datasourceToPendingSegments = 
new HashMap<>();
+
+    private final Map<String, Interval> observedKillIntervals = new 
HashMap<>();
+
+    private int taskIdSuffix = 0;
+
+    void addTaskAndSegment(String datasource, DateTime createdTime, TaskState 
state)
+    {
+      taskStatuses.add(
+          new TaskStatusPlus(
+              datasource + "__" + taskIdSuffix++,
+              null, null, createdTime, createdTime, state,
+              state.isComplete() ? RunnerTaskState.NONE : 
RunnerTaskState.RUNNING,
+              100L, TaskLocation.unknown(), datasource, null
+          )
+      );
+
+      // Add a pending segment with created time 5 minutes after the task was 
created
+      datasourceToPendingSegments.computeIfAbsent(datasource, ds -> new 
ArrayList<>())
+                                 .add(createdTime.plusMinutes(5));
+    }
+
+    @Override
+    public ListenableFuture<CloseableIterator<TaskStatusPlus>> taskStatuses(
+        @Nullable String state,
+        @Nullable String dataSource,
+        @Nullable Integer maxCompletedTasks
+    )
+    {
+      return Futures.immediateFuture(
+          CloseableIterators.wrap(taskStatuses.iterator(), null)
+      );
+    }
+
+    @Override
+    public ListenableFuture<Integer> killPendingSegments(String dataSource, 
Interval interval)
+    {
+      observedKillIntervals.put(dataSource, interval);
+
+      List<DateTime> pendingSegments = 
datasourceToPendingSegments.remove(dataSource);
+      if (pendingSegments == null || pendingSegments.isEmpty()) {
+        return Futures.immediateFuture(0);
+      }
+
+      List<DateTime> remainingPendingSegments = new ArrayList<>();
+      int numDeletedPendingSegments = 0;
+      for (DateTime createdTime : pendingSegments) {
+        if (createdTime.isBefore(interval.getEnd())) {
+          ++numDeletedPendingSegments;
+        } else {
+          remainingPendingSegments.add(createdTime);
+        }
+      }
+
+      if (remainingPendingSegments.size() > 0) {
+        datasourceToPendingSegments.put(dataSource, remainingPendingSegments);
+      }
+
+      return Futures.immediateFuture(numDeletedPendingSegments);
+    }
+  }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java 
b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 5327f0b3a7..a7883ceeb5 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -78,7 +78,6 @@ import 
org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.server.audit.AuditManagerProvider;
 import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
-import org.apache.druid.server.coordinator.KillStalePendingSegments;
 import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
 import 
org.apache.druid.server.coordinator.balancer.CachingCostBalancerStrategyConfig;
 import 
org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
@@ -91,6 +90,7 @@ import org.apache.druid.server.coordinator.duty.KillAuditLog;
 import org.apache.druid.server.coordinator.duty.KillCompactionConfig;
 import org.apache.druid.server.coordinator.duty.KillDatasourceMetadata;
 import org.apache.druid.server.coordinator.duty.KillRules;
+import org.apache.druid.server.coordinator.duty.KillStalePendingSegments;
 import org.apache.druid.server.coordinator.duty.KillSupervisors;
 import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
 import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to