This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new deb69d1bc0 Allow coordinator to be configured to kill segments in 
future (#10877)
deb69d1bc0 is described below

commit deb69d1bc03aef784a563260bed1d21505495439
Author: Lucas Capistrant <[email protected]>
AuthorDate: Tue May 10 21:05:15 2022 -0500

    Allow coordinator to be configured to kill segments in future (#10877)
    
    Allow a Druid cluster to kill segments whose interval_end is a date in the 
future. This can be done by setting druid.coordinator.kill.durationToRetain to 
a negative period. For example PT-24H would allow segments to be killed if 
their interval_end date was 24 hours or less into the future at the time that 
the kill task is generated by the system.
    
    A cluster operator can also disregard the 
druid.coordinator.kill.durationToRetain entirely by setting a new 
configuration, druid.coordinator.kill.ignoreDurationToRetain=true. This ignores 
interval_end date when looking for segments to kill, and instead is capable of 
killing any segment marked unused. This new configuration is off by default, 
and a cluster operator should fully understand and accept the risks if they 
enable it.
---
 .../apache/druid/java/util/common/DateTimes.java   |   9 +-
 .../druid/java/util/common/DateTimesTest.java      |   4 +-
 docs/configuration/index.md                        |   3 +-
 .../server/coordinator/DruidCoordinatorConfig.java |   4 +
 .../coordinator/duty/KillUnusedSegments.java       |  51 ++++++-
 .../metadata/SqlSegmentsMetadataManagerTest.java   |   6 +
 .../coordinator/CuratorDruidCoordinatorTest.java   |   3 +-
 .../coordinator/DruidCoordinatorConfigTest.java    |  10 ++
 .../server/coordinator/DruidCoordinatorTest.java   |   4 +-
 .../server/coordinator/HttpLoadQueuePeonTest.java  |   3 +-
 .../server/coordinator/LoadQueuePeonTest.java      |   9 +-
 .../server/coordinator/LoadQueuePeonTester.java    |   3 +-
 .../coordinator/TestDruidCoordinatorConfig.java    |  15 +-
 .../server/coordinator/duty/KillAuditLogTest.java  |  12 +-
 .../coordinator/duty/KillCompactionConfigTest.java |  15 +-
 .../duty/KillDatasourceMetadataTest.java           |  15 +-
 .../server/coordinator/duty/KillRulesTest.java     |  12 +-
 .../coordinator/duty/KillSupervisorsTest.java      |  12 +-
 .../coordinator/duty/KillUnusedSegmentsTest.java   | 168 ++++++++++++++++++++-
 website/.spelling                                  |   1 +
 20 files changed, 315 insertions(+), 44 deletions(-)

diff --git 
a/core/src/main/java/org/apache/druid/java/util/common/DateTimes.java 
b/core/src/main/java/org/apache/druid/java/util/common/DateTimes.java
index 4129094907..b9c8daff0d 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/DateTimes.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/DateTimes.java
@@ -39,8 +39,9 @@ public final class DateTimes
   public static final DateTime EPOCH = utc(0);
   public static final DateTime MAX = utc(JodaUtils.MAX_INSTANT);
   public static final DateTime MIN = utc(JodaUtils.MIN_INSTANT);
-  public static final DateTime CAN_COMPARE_AS_YEAR_MIN = of("0000-01-01");
-  public static final DateTime CAN_COMPARE_AS_YEAR_MAX = 
of("10000-01-01").minus(1);
+  // The following two DateTime objects are utilities that can be used for 
accurately comparing date strings
+  public static final DateTime COMPARE_DATE_AS_STRING_MIN = of("0000-01-01");
+  public static final DateTime COMPARE_DATE_AS_STRING_MAX = 
of("10000-01-01").minus(1);
 
   public static final UtcFormatter ISO_DATE_TIME = 
wrapFormatter(ISODateTimeFormat.dateTime());
   public static final UtcFormatter ISO_DATE_OPTIONAL_TIME = 
wrapFormatter(ISODateTimeFormat.dateOptionalTimeParser());
@@ -188,8 +189,8 @@ public final class DateTimes
    */
   public static boolean canCompareAsString(final DateTime dateTime)
   {
-    return dateTime.getMillis() >= CAN_COMPARE_AS_YEAR_MIN.getMillis()
-           && dateTime.getMillis() <= CAN_COMPARE_AS_YEAR_MAX.getMillis()
+    return dateTime.getMillis() >= COMPARE_DATE_AS_STRING_MIN.getMillis()
+           && dateTime.getMillis() <= COMPARE_DATE_AS_STRING_MAX.getMillis()
            && ISOChronology.getInstanceUTC().equals(dateTime.getChronology());
   }
 
diff --git 
a/core/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java 
b/core/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java
index bd0341b652..55cc0b52a9 100644
--- a/core/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java
@@ -107,8 +107,8 @@ public class DateTimesTest
     Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.EPOCH));
     
Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.of("0000-01-01")));
 
-    Assert.assertEquals("0000-01-01T00:00:00.000Z", 
DateTimes.CAN_COMPARE_AS_YEAR_MIN.toString());
-    Assert.assertEquals("9999-12-31T23:59:59.999Z", 
DateTimes.CAN_COMPARE_AS_YEAR_MAX.toString());
+    Assert.assertEquals("0000-01-01T00:00:00.000Z", 
DateTimes.COMPARE_DATE_AS_STRING_MIN.toString());
+    Assert.assertEquals("9999-12-31T23:59:59.999Z", 
DateTimes.COMPARE_DATE_AS_STRING_MAX.toString());
 
     Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.of("9999")));
     Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.of("2000")));
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 1e829bce74..46f2b50824 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -812,7 +812,8 @@ These Coordinator static configurations can be defined in 
the `coordinator/runti
 |`druid.coordinator.kill.pendingSegments.on`|Boolean flag for whether or not 
the Coordinator clean up old entries in the `pendingSegments` table of metadata 
store. If set to true, Coordinator will check the created time of most recently 
complete task. If it doesn't exist, it finds the created time of the earliest 
running/pending/waiting tasks. Once the created time is found, then for all 
dataSources not in the `killPendingSegmentsSkipList` (see [Dynamic 
configuration](#dynamic-configurat [...]
 |`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator 
should submit kill task for unused segments, that is, hard delete them from 
metadata store and deep storage. If set to true, then for all whitelisted 
dataSources (or optionally all), Coordinator will submit tasks periodically 
based on `period` specified. These kill tasks will delete all unused segments 
except for the last `durationToRetain` period. A whitelist can be set via 
dynamic configuration `killDataSource [...]
 |`druid.coordinator.kill.period`|How often to send kill tasks to the indexing 
service. Value must be greater than `druid.coordinator.period.indexingPeriod`. 
Only applies if kill is turned on.|P1D (1 Day)|
-|`druid.coordinator.kill.durationToRetain`| Do not kill unused segments in 
last `durationToRetain`, must be greater or equal to 0. Only applies and MUST 
be specified if kill is turned on.|`P90D`|
+|`druid.coordinator.kill.durationToRetain`|Only applies if you set 
`druid.coordinator.kill.on` to `true`. This value is ignored if 
`druid.coordinator.kill.ignoreDurationToRetain` is `true`. Valid configurations 
must be a ISO8601 period. Druid will not kill unused segments whose interval 
end date is beyond `now - durationToRetain`. `durationToRetain` can be a 
negative ISO8601 period, which would result in `now - durationToRetain` to be 
in the future.|`P90D`|
+|`druid.coordinator.kill.ignoreDurationToRetain`|A way to override 
`druid.coordinator.kill.durationToRetain` and tell the coordinator that you do 
not care about the end date of unused segment intervals when it comes to 
killing them. If true, the coordinator considers all unused segments as 
eligible to be killed.|false|
 |`druid.coordinator.kill.maxSegments`|Kill at most n unused segments per kill 
task submission, must be greater than 0. Only applies and MUST be specified if 
kill is turned on.|100|
 |`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy 
for the coordinator to use to distribute segments among the historicals. 
`cachingCost` is logically equivalent to `cost` but is more CPU-efficient on 
large clusters. `diskNormalized` weights the costs according to the servers' 
disk usage ratios - there are known issues with this strategy distributing 
segments unevenly across the cluster. `random` distributes segments among 
services randomly.|`cost`|
 |`druid.coordinator.balancer.cachingCost.awaitInitialization`|Whether to wait 
for segment view initialization before creating the `cachingCost` balancing 
strategy. This property is enabled only when 
`druid.coordinator.balancer.strategy` is `cachingCost`. If set to 'true', the 
Coordinator will not start to assign segments, until the segment view is 
initialized. If set to 'false', the Coordinator will fallback to use the `cost` 
balancing strategy only if the segment view is not initialized [...]
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
index 8ad407ac09..c02baf9cc6 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
@@ -51,6 +51,10 @@ public abstract class DruidCoordinatorConfig
   @Default("P90D")
   public abstract Duration getCoordinatorKillDurationToRetain();
 
+  @Config("druid.coordinator.kill.ignoreDurationToRetain")
+  @Default("false")
+  public abstract boolean getCoordinatorKillIgnoreDurationToRetain();
+
   @Config("druid.coordinator.kill.maxSegments")
   @Default("100")
   public abstract int getCoordinatorKillMaxSegments();
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index 72eb1242d8..2ab3762ba9 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -30,6 +30,7 @@ import org.apache.druid.metadata.SegmentsMetadataManager;
 import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.utils.CollectionUtils;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -37,8 +38,11 @@ import java.util.Collection;
 import java.util.List;
 
 /**
- * Completely removes information about unused segments whose end time is 
older than {@link #retainDuration} from now
- * from the metadata store. This action is called "to kill a segment".
+ * Completely removes information about unused segments who have an interval 
end that comes before
+ * now - {@link #retainDuration} from the metadata store. retainDuration can 
be a positive or negative duration,
+ * negative meaning the interval end target will be in the future. Also, 
retainDuration can be ignored,
+ * meaning that there is no upper bound to the end interval of segments that 
will be killed. This action is called
+ * "to kill a segment".
  *
  * See org.apache.druid.indexing.common.task.KillUnusedSegmentsTask.
  */
@@ -48,6 +52,7 @@ public class KillUnusedSegments implements CoordinatorDuty
 
   private final long period;
   private final long retainDuration;
+  private final boolean ignoreRetainDuration;
   private final int maxSegmentsToKill;
   private long lastKillTime = 0;
 
@@ -67,8 +72,15 @@ public class KillUnusedSegments implements CoordinatorDuty
         "coordinator kill period must be greater than 
druid.coordinator.period.indexingPeriod"
     );
 
+    this.ignoreRetainDuration = 
config.getCoordinatorKillIgnoreDurationToRetain();
     this.retainDuration = 
config.getCoordinatorKillDurationToRetain().getMillis();
-    Preconditions.checkArgument(this.retainDuration >= 0, "coordinator kill 
retainDuration must be >= 0");
+    if (this.ignoreRetainDuration) {
+      log.debug(
+          "druid.coordinator.kill.durationToRetain [%s] will be ignored when 
discovering segments to kill "
+          + "because you have set 
druid.coordinator.kill.ignoreDurationToRetain to True.",
+          this.retainDuration
+      );
+    }
 
     this.maxSegmentsToKill = config.getCoordinatorKillMaxSegments();
     Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill 
maxSegments must be > 0");
@@ -76,7 +88,7 @@ public class KillUnusedSegments implements CoordinatorDuty
     log.info(
         "Kill Task scheduling enabled with period [%s], retainDuration [%s], 
maxSegmentsToKill [%s]",
         this.period,
-        this.retainDuration,
+        this.ignoreRetainDuration ? "IGNORING" : this.retainDuration,
         this.maxSegmentsToKill
     );
 
@@ -118,12 +130,20 @@ public class KillUnusedSegments implements CoordinatorDuty
     return params;
   }
 
+  /**
+   * For a given datasource and limit of segments that can be killed in one 
task, determine the interval to be
+   * submitted with the kill task.
+   *
+   * @param dataSource dataSource whose unused segments are being killed.
+   * @param limit the maximum number of segments that can be included in the 
kill task.
+   * @return {@link Interval} to be used in the kill task.
+   */
   @VisibleForTesting
   @Nullable
   Interval findIntervalForKill(String dataSource, int limit)
   {
     List<Interval> unusedSegmentIntervals =
-        segmentsMetadataManager.getUnusedSegmentIntervals(dataSource, 
DateTimes.nowUtc().minus(retainDuration), limit);
+        segmentsMetadataManager.getUnusedSegmentIntervals(dataSource, 
getEndTimeUpperLimit(), limit);
 
     if (unusedSegmentIntervals != null && unusedSegmentIntervals.size() > 0) {
       return JodaUtils.umbrellaInterval(unusedSegmentIntervals);
@@ -131,4 +151,25 @@ public class KillUnusedSegments implements CoordinatorDuty
       return null;
     }
   }
+
+  /**
+   * Calculate the {@link DateTime} that wil form the upper bound when looking 
for segments that are
+   * eligible to be killed. If ignoreDurationToRetain is true, we have no 
upper bound and return a DateTime object
+   * for "max" time that works when comparing date strings.
+   *
+   * @return {@link DateTime} representing the upper bound time used when 
looking for segments to kill.
+   */
+  @VisibleForTesting
+  DateTime getEndTimeUpperLimit()
+  {
+    return ignoreRetainDuration
+           ? DateTimes.COMPARE_DATE_AS_STRING_MAX
+           : DateTimes.nowUtc().minus(retainDuration);
+  }
+
+  @VisibleForTesting
+  Long getRetainDuration()
+  {
+    return retainDuration;
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
 
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
index 2c78a11544..6dad542644 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
@@ -377,6 +377,12 @@ public class SqlSegmentsMetadataManagerTest
         sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", 
DateTimes.of("3000"), 1)
     );
 
+    // Test the DateTime maxEndTime argument of getUnusedSegmentIntervals
+    Assert.assertEquals(
+        ImmutableList.of(segment2.getInterval()),
+        sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", 
DateTimes.of(2012, 1, 7, 0, 0), 1)
+    );
+
     Assert.assertEquals(
         ImmutableList.of(segment2.getInterval(), segment1.getInterval()),
         sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", 
DateTimes.of("3000"), 5)
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index 3adbbd638e..68621ed606 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -183,7 +183,8 @@ public class CuratorDruidCoordinatorTest extends 
CuratorTestBase
         null,
         null,
         10,
-        new Duration("PT0s")
+        new Duration("PT0s"),
+        false
     );
     sourceLoadQueueChildrenCache = new PathChildrenCache(
         curator,
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
index 9978c33736..169ad39fa0 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
@@ -48,6 +48,7 @@ public class DruidCoordinatorConfigTest
     Assert.assertEquals(new Duration(15 * 60 * 1000), 
config.getLoadTimeoutDelay());
     Assert.assertEquals(Duration.millis(50), 
config.getLoadQueuePeonRepeatDelay());
     Assert.assertTrue(config.getCompactionSkipLockedIntervals());
+    Assert.assertFalse(config.getCoordinatorKillIgnoreDurationToRetain());
 
     //with non-defaults
     Properties props = new Properties();
@@ -62,6 +63,7 @@ public class DruidCoordinatorConfigTest
     props.setProperty("druid.coordinator.load.timeout", "PT1s");
     props.setProperty("druid.coordinator.loadqueuepeon.repeatDelay", 
"PT0.100s");
     props.setProperty("druid.coordinator.compaction.skipLockedIntervals", 
"false");
+    props.setProperty("druid.coordinator.kill.ignoreDurationToRetain", "true");
 
     factory = Config.createFactory(props);
     config = factory.build(DruidCoordinatorConfig.class);
@@ -75,5 +77,13 @@ public class DruidCoordinatorConfigTest
     Assert.assertEquals(new Duration("PT1s"), config.getLoadTimeoutDelay());
     Assert.assertEquals(Duration.millis(100), 
config.getLoadQueuePeonRepeatDelay());
     Assert.assertFalse(config.getCompactionSkipLockedIntervals());
+    Assert.assertTrue(config.getCoordinatorKillIgnoreDurationToRetain());
+
+    // Test negative druid.coordinator.kill.durationToRetain now that it is 
valid.
+    props = new Properties();
+    props.setProperty("druid.coordinator.kill.durationToRetain", "PT-1s");
+    factory = Config.createFactory(props);
+    config = factory.build(DruidCoordinatorConfig.class);
+    Assert.assertEquals(new Duration("PT-1s"), 
config.getCoordinatorKillDurationToRetain());
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 7a12a7a2bb..26975d9aa9 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -163,7 +163,8 @@ public class DruidCoordinatorTest extends CuratorTestBase
         null,
         null,
         10,
-        new Duration("PT0s")
+        new Duration("PT0s"),
+        false
     );
     pathChildrenCache = new PathChildrenCache(
         curator,
@@ -943,6 +944,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
         null,
         10,
         new Duration("PT0s"),
+        false,
         false
     );
     CoordinatorCustomDutyGroup compactSegmentCustomGroup = new 
CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1), 
ImmutableList.of(new CompactSegments(differentConfigUsedInCustomGroup, null, 
null)));
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
index f83c155dcf..019b59a57b 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
@@ -91,7 +91,8 @@ public class HttpLoadQueuePeonTest
       null,
       null,
       10,
-      Duration.ZERO
+      Duration.ZERO,
+      false
   )
   {
     @Override
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
index c5317ba14e..74af5dc282 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
@@ -106,7 +106,8 @@ public class LoadQueuePeonTest extends CuratorTestBase
             null,
             null,
             10,
-            Duration.millis(0)
+            Duration.millis(0),
+            false
         )
     );
 
@@ -311,7 +312,8 @@ public class LoadQueuePeonTest extends CuratorTestBase
             null,
             null,
             10,
-            new Duration("PT1s")
+            new Duration("PT1s"),
+            false
         )
     );
 
@@ -373,7 +375,8 @@ public class LoadQueuePeonTest extends CuratorTestBase
             null,
             null,
             10,
-            new Duration("PT1s")
+            new Duration("PT1s"),
+            false
         )
     );
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
index 73c0b64d7f..e153b3926f 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
@@ -55,7 +55,8 @@ public class LoadQueuePeonTester extends CuratorLoadQueuePeon
             null,
             null,
             10,
-            new Duration("PT1s")
+            new Duration("PT1s"),
+            false
         )
     );
   }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
index abb500e983..644f0e187f 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
@@ -42,6 +42,7 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
   private final Duration getLoadQueuePeonRepeatDelay;
   private final int coordinatorKillMaxSegments;
   private final boolean compactionSkipLockedIntervals;
+  private final boolean coordinatorKillIgnoreDurationToRetain;
 
   public TestDruidCoordinatorConfig(
       Duration coordinatorStartDelay,
@@ -61,7 +62,8 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
       Duration coordinatorDatasourceKillPeriod,
       Duration coordinatorDatasourceKillDurationToRetain,
       int coordinatorKillMaxSegments,
-      Duration getLoadQueuePeonRepeatDelay
+      Duration getLoadQueuePeonRepeatDelay,
+      boolean coordinatorKillIgnoreDurationToRetain
   )
   {
     this.coordinatorStartDelay = coordinatorStartDelay;
@@ -83,6 +85,7 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
     this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
     this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay;
     this.compactionSkipLockedIntervals = true;
+    this.coordinatorKillIgnoreDurationToRetain = 
coordinatorKillIgnoreDurationToRetain;
   }
 
   public TestDruidCoordinatorConfig(
@@ -104,7 +107,8 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
       Duration coordinatorDatasourceKillDurationToRetain,
       int coordinatorKillMaxSegments,
       Duration getLoadQueuePeonRepeatDelay,
-      boolean compactionSkipLockedIntervals
+      boolean compactionSkipLockedIntervals,
+      boolean coordinatorKillIgnoreDurationToRetain
   )
   {
     this.coordinatorStartDelay = coordinatorStartDelay;
@@ -126,6 +130,7 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
     this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
     this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay;
     this.compactionSkipLockedIntervals = compactionSkipLockedIntervals;
+    this.coordinatorKillIgnoreDurationToRetain = 
coordinatorKillIgnoreDurationToRetain;
   }
 
   @Override
@@ -241,4 +246,10 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
   {
     return compactionSkipLockedIntervals;
   }
+
+  @Override
+  public boolean getCoordinatorKillIgnoreDurationToRetain()
+  {
+    return coordinatorKillIgnoreDurationToRetain;
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
index 926b80e2ab..84366cde0b 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
@@ -72,7 +72,8 @@ public class KillAuditLogTest
         null,
         null,
         10,
-        null
+        null,
+        false
     );
     killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
     killAuditLog.run(mockDruidCoordinatorRuntimeParams);
@@ -101,7 +102,8 @@ public class KillAuditLogTest
         null,
         null,
         10,
-        null
+        null,
+        false
     );
     killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
     killAuditLog.run(mockDruidCoordinatorRuntimeParams);
@@ -130,7 +132,8 @@ public class KillAuditLogTest
         null,
         null,
         10,
-        null
+        null,
+        false
     );
     exception.expect(IllegalArgumentException.class);
     exception.expectMessage("coordinator audit kill period must be >= 
druid.coordinator.period.metadataStoreManagementPeriod");
@@ -158,7 +161,8 @@ public class KillAuditLogTest
         null,
         null,
         10,
-        null
+        null,
+        false
     );
     exception.expect(IllegalArgumentException.class);
     exception.expectMessage("coordinator audit kill retainDuration must be >= 
0");
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
index f2a224a631..a6e98c8051 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
@@ -104,7 +104,8 @@ public class KillCompactionConfigTest
         null,
         null,
         10,
-        null
+        null,
+        false
     );
     killCompactionConfig = new KillCompactionConfig(
         druidCoordinatorConfig,
@@ -140,7 +141,8 @@ public class KillCompactionConfigTest
         null,
         null,
         10,
-        null
+        null,
+        false
     );
     exception.expect(IllegalArgumentException.class);
     exception.expectMessage("Coordinator compaction configuration kill period 
must be >= druid.coordinator.period.metadataStoreManagementPeriod");
@@ -189,7 +191,8 @@ public class KillCompactionConfigTest
         null,
         null,
         10,
-        null
+        null,
+        false
     );
     killCompactionConfig = new KillCompactionConfig(
         druidCoordinatorConfig,
@@ -294,7 +297,8 @@ public class KillCompactionConfigTest
         null,
         null,
         10,
-        null
+        null,
+        false
     );
     killCompactionConfig = new KillCompactionConfig(
         druidCoordinatorConfig,
@@ -413,7 +417,8 @@ public class KillCompactionConfigTest
         null,
         null,
         10,
-        null
+        null,
+        false
     );
     killCompactionConfig = new KillCompactionConfig(
         druidCoordinatorConfig,
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java
index 0af57debf7..22b3ec0744 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java
@@ -81,7 +81,8 @@ public class KillDatasourceMetadataTest
         new Duration(Long.MAX_VALUE),
         new Duration("PT1S"),
         10,
-        null
+        null,
+        false
     );
     killDatasourceMetadata = new 
KillDatasourceMetadata(druidCoordinatorConfig, 
mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
     killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams);
@@ -112,7 +113,8 @@ public class KillDatasourceMetadataTest
         new Duration("PT6S"),
         new Duration("PT1S"),
         10,
-        null
+        null,
+        false
     );
     killDatasourceMetadata = new 
KillDatasourceMetadata(druidCoordinatorConfig, 
mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
     killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams);
@@ -141,7 +143,8 @@ public class KillDatasourceMetadataTest
         new Duration("PT3S"),
         new Duration("PT1S"),
         10,
-        null
+        null,
+        false
     );
     exception.expect(IllegalArgumentException.class);
     exception.expectMessage("Coordinator datasource metadata kill period must 
be >= druid.coordinator.period.metadataStoreManagementPeriod");
@@ -169,7 +172,8 @@ public class KillDatasourceMetadataTest
         new Duration("PT6S"),
         new Duration("PT-1S"),
         10,
-        null
+        null,
+        false
     );
     exception.expect(IllegalArgumentException.class);
     exception.expectMessage("Coordinator datasource metadata kill 
retainDuration must be >= 0");
@@ -199,7 +203,8 @@ public class KillDatasourceMetadataTest
         new Duration("PT6S"),
         new Duration("PT1S"),
         10,
-        null
+        null,
+        false
     );
     killDatasourceMetadata = new 
KillDatasourceMetadata(druidCoordinatorConfig, 
mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
     killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams);
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java
index 86a5d2c39b..0db43018a6 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java
@@ -79,7 +79,8 @@ public class KillRulesTest
         null,
         null,
         10,
-        null
+        null,
+        false
     );
     killRules = new KillRules(druidCoordinatorConfig);
     killRules.run(mockDruidCoordinatorRuntimeParams);
@@ -108,7 +109,8 @@ public class KillRulesTest
         null,
         null,
         10,
-        null
+        null,
+        false
     );
     killRules = new KillRules(druidCoordinatorConfig);
     killRules.run(mockDruidCoordinatorRuntimeParams);
@@ -137,7 +139,8 @@ public class KillRulesTest
         null,
         null,
         10,
-        null
+        null,
+        false
     );
     exception.expect(IllegalArgumentException.class);
     exception.expectMessage("coordinator rule kill period must be >= 
druid.coordinator.period.metadataStoreManagementPeriod");
@@ -165,7 +168,8 @@ public class KillRulesTest
         null,
         null,
         10,
-        null
+        null,
+        false
     );
     exception.expect(IllegalArgumentException.class);
     exception.expectMessage("coordinator rule kill retainDuration must be >= 
0");
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java
index c5fa2d762b..d21b3afb70 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java
@@ -72,7 +72,8 @@ public class KillSupervisorsTest
         null,
         null,
         10,
-        null
+        null,
+        false
     );
     killSupervisors = new KillSupervisors(druidCoordinatorConfig, 
mockMetadataSupervisorManager);
     killSupervisors.run(mockDruidCoordinatorRuntimeParams);
@@ -101,7 +102,8 @@ public class KillSupervisorsTest
         null,
         null,
         10,
-        null
+        null,
+        false
     );
     killSupervisors = new KillSupervisors(druidCoordinatorConfig, 
mockMetadataSupervisorManager);
     killSupervisors.run(mockDruidCoordinatorRuntimeParams);
@@ -130,7 +132,8 @@ public class KillSupervisorsTest
         null,
         null,
         10,
-        null
+        null,
+        false
     );
     exception.expect(IllegalArgumentException.class);
     exception.expectMessage("Coordinator supervisor kill period must be >= 
druid.coordinator.period.metadataStoreManagementPeriod");
@@ -158,7 +161,8 @@ public class KillSupervisorsTest
         null,
         null,
         10,
-        null
+        null,
+        false
     );
     exception.expect(IllegalArgumentException.class);
     exception.expectMessage("Coordinator supervisor kill retainDuration must 
be >= 0");
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index a1e9626931..3c9ebcc3a9 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.duty;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.metadata.SegmentsMetadataManager;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
@@ -193,7 +194,8 @@ public class KillUnusedSegmentsTest
               null,
               null,
               1000,
-              Duration.ZERO
+              Duration.ZERO,
+              false
           )
       );
 
@@ -202,5 +204,169 @@ public class KillUnusedSegmentsTest
           unusedSegmentsKiller.findIntervalForKill("test", 10000)
       );
     }
+
+    /**
+     * Test that retainDuration is properly set based on the value available 
in the
+     * Coordinator config. Positive and Negative durations should work as well 
as
+     * null, if and only if ignoreDurationToRetain is true.
+     */
+    @Test
+    public void testRetainDurationValues()
+    {
+      // Positive duration to retain
+      KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
+          null,
+          null,
+          new TestDruidCoordinatorConfig(
+              null,
+              null,
+              Duration.parse("PT76400S"),
+              null,
+              new Duration(1),
+              Duration.parse("PT86400S"),
+              Duration.parse("PT86400S"),
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              1000,
+              Duration.ZERO,
+              false
+          )
+      );
+      Assert.assertEquals((Long) Duration.parse("PT86400S").getMillis(), 
unusedSegmentsKiller.getRetainDuration());
+
+      // Negative duration to retain
+      unusedSegmentsKiller = new KillUnusedSegments(
+          null,
+          null,
+          new TestDruidCoordinatorConfig(
+              null,
+              null,
+              Duration.parse("PT76400S"),
+              null,
+              new Duration(1),
+              Duration.parse("PT86400S"),
+              Duration.parse("PT-86400S"),
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              1000,
+              Duration.ZERO,
+              false
+          )
+      );
+      Assert.assertEquals((Long) Duration.parse("PT-86400S").getMillis(), 
unusedSegmentsKiller.getRetainDuration());
+    }
+
+    /**
+     * Test that the end time upper limit is properly computated for both 
positive and
+     * negative durations. Also ensure that if durationToRetain is to be 
ignored, that
+     * the upper limit is {@link DateTime} max time.
+     */
+    @Test
+    public void testGetEndTimeUpperLimit()
+    {
+      // If ignoreDurationToRetain is true, ignore the value configured for 
durationToRetain and return 9999-12-31T23:59
+      KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
+          null,
+          null,
+          new TestDruidCoordinatorConfig(
+              null,
+              null,
+              Duration.parse("PT76400S"),
+              null,
+              new Duration(1),
+              Duration.parse("PT86400S"),
+              Duration.parse("PT86400S"),
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              1000,
+              Duration.ZERO,
+              true
+          )
+      );
+      Assert.assertEquals(
+          DateTimes.COMPARE_DATE_AS_STRING_MAX,
+          unusedSegmentsKiller.getEndTimeUpperLimit()
+      );
+
+      // Testing a negative durationToRetain period returns proper date in 
future
+      unusedSegmentsKiller = new KillUnusedSegments(
+          null,
+          null,
+          new TestDruidCoordinatorConfig(
+              null,
+              null,
+              Duration.parse("PT76400S"),
+              null,
+              new Duration(1),
+              Duration.parse("PT86400S"),
+              Duration.parse("PT-86400S"),
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              1000,
+              Duration.ZERO,
+              false
+          )
+      );
+
+      DateTime expectedTime = 
DateTimes.nowUtc().minus(Duration.parse("PT-86400S").getMillis());
+      Assert.assertEquals(expectedTime, 
unusedSegmentsKiller.getEndTimeUpperLimit());
+
+      // Testing a positive durationToRetain period returns expected value in 
the past
+      unusedSegmentsKiller = new KillUnusedSegments(
+          null,
+          null,
+          new TestDruidCoordinatorConfig(
+              null,
+              null,
+              Duration.parse("PT76400S"),
+              null,
+              new Duration(1),
+              Duration.parse("PT86400S"),
+              Duration.parse("PT86400S"),
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              1000,
+              Duration.ZERO,
+              false
+          )
+      );
+      expectedTime = 
DateTimes.nowUtc().minus(Duration.parse("PT86400S").getMillis());
+      Assert.assertEquals(expectedTime, 
unusedSegmentsKiller.getEndTimeUpperLimit());
+    }
   }
 }
diff --git a/website/.spelling b/website/.spelling
index 7097c7bd26..d8d4cb0f8a 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1863,6 +1863,7 @@ druid_taskLock
 druid_taskLog
 druid_tasks
 DruidQueryRel
+durationToRetain
 ec2
 equalDistribution
 extractionFn


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to