kfaraz commented on code in PR #15941:
URL: https://github.com/apache/druid/pull/15941#discussion_r1499400507


##########
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java:
##########
@@ -96,362 +104,805 @@ public class KillUnusedSegmentsTest
   private DataSegment nextDaySegment;
   private DataSegment nextMonthSegment;
 
-  private KillUnusedSegments target;
 
   @Before
   public void setup()
   {
-    
Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig();
-    Mockito.doReturn(stats).when(params).getCoordinatorStats();
-    
Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod();
-    
Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain();
-    
Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod();
-    
Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments();
-    
Mockito.doReturn(Duration.parse("PT3154000000S")).when(config).getCoordinatorKillBufferPeriod();
+    segmentsMetadataManager = new TestSegmentsMetadataManager();
+    overlordClient = new TestOverlordClient();
 
-    Mockito.doReturn(Collections.singleton(DATASOURCE))
-           
.when(coordinatorDynamicConfig).getSpecificDataSourcesToKillUnusedSegmentsIn();
+    // These two can definitely be part of setup()
+    configBuilder = new TestDruidCoordinatorConfig.Builder()
+        .withCoordinatorIndexingPeriod(INDEXING_PERIOD)
+        .withCoordinatorKillPeriod(COORDINATOR_KILL_PERIOD)
+        .withCoordinatorKillDurationToRetain(DURATION_TO_RETAIN)
+        .withCoordinatorKillMaxSegments(MAX_SEGMENTS_TO_KILL)
+        .withCoordinatorKillBufferPeriod(BUFFER_PERIOD);
+    paramsBuilder = 
DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc());
+  }
 
-    final DateTime now = DateTimes.nowUtc();
+  /**
+   * The buffer periood and duration to retain influence kill behavior.
+   */
+  @Test
+  public void testDefaults()
+  {
+    final DateTime sixtyDaysAgo = NOW.minusDays(60);
 
-    yearOldSegment = createSegmentWithEnd(now.minusDays(365));
-    monthOldSegment = createSegmentWithEnd(now.minusDays(30));
-    dayOldSegment = createSegmentWithEnd(now.minusDays(1));
-    hourOldSegment = createSegmentWithEnd(now.minusHours(1));
-    nextDaySegment = createSegmentWithEnd(now.plusDays(1));
-    nextMonthSegment = createSegmentWithEnd(now.plusDays(30));
-
-    final List<DataSegment> unusedSegments = ImmutableList.of(
-        yearOldSegment,
-        monthOldSegment,
-        dayOldSegment,
-        hourOldSegment,
-        nextDaySegment,
-        nextMonthSegment
-    );
-
-    Mockito.when(
-        segmentsMetadataManager.getUnusedSegmentIntervals(
-            ArgumentMatchers.anyString(),
-            ArgumentMatchers.any(),
-            ArgumentMatchers.any(),
-            ArgumentMatchers.anyInt(),
-            ArgumentMatchers.any()
-        )
-    ).thenAnswer(invocation -> {
-      DateTime minStartTime = invocation.getArgument(1);
-      DateTime maxEndTime = invocation.getArgument(2);
-      long maxEndMillis = maxEndTime.getMillis();
-      Long minStartMillis = minStartTime != null ? minStartTime.getMillis() : 
null;
-      List<Interval> unusedIntervals =
-          unusedSegments.stream()
-                        .map(DataSegment::getInterval)
-                        .filter(i -> i.getEnd().getMillis() <= maxEndMillis
-                                     && (null == minStartMillis || 
i.getStart().getMillis() >= minStartMillis))
-                        .collect(Collectors.toList());
-
-      int limit = invocation.getArgument(3);
-      return unusedIntervals.size() <= limit ? unusedIntervals : 
unusedIntervals.subList(0, limit);
-    });
-
-    target = new KillUnusedSegments(
+    createAndAddUnusedSegment(DS1, YEAR_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, MONTH_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, DAY_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, HOUR_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, NEXT_DAY, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, Intervals.ETERNITY, sixtyDaysAgo);
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        new TestDruidCoordinatorConfig.Builder().build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstRun = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 1L)),
+        firstRun.getCoordinatorStats()
     );
+
+    validateAndResetState(DS1, YEAR_OLD);
   }
 
   @Test
   public void testRunWithNoIntervalShouldNotKillAnySegments()
   {
-    
Mockito.doReturn(null).when(segmentsMetadataManager).getUnusedSegmentIntervals(
-        ArgumentMatchers.anyString(),
-        ArgumentMatchers.any(),
-        ArgumentMatchers.any(),
-        ArgumentMatchers.anyInt(),
-        ArgumentMatchers.any()
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 0, 10, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
     );
 
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    target.run(params);
-    Mockito.verify(overlordClient, Mockito.never())
-           .runKillTask(anyString(), anyString(), any(Interval.class), 
anyInt(), any(DateTime.class));
+    validateAndResetState(DS1, null);
+    validateAndResetState(DS2, null);
   }
 
   @Test
   public void 
testRunWithSpecificDatasourceAndNoIntervalShouldNotKillAnySegments()
   {
-    Mockito.doReturn(Duration.standardDays(400))
-           .when(config).getCoordinatorKillDurationToRetain();
-    target = new KillUnusedSegments(
+    
configBuilder.withCoordinatorKillDurationToRetain(Duration.standardDays(400));
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        configBuilder.build()
     );
 
-    // No unused segment is older than the retention period
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    target.run(params);
-    Mockito.verify(overlordClient, Mockito.never())
-           .runKillTask(anyString(), anyString(), any(Interval.class), 
anyInt(), any(DateTime.class));
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 0, 10, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, null);
+    validateAndResetState(DS2, null);
+  }
+
+  /**
+   * The kill period is honored after the first indexing run.
+   */
+  @Test
+  public void testRunsWithKillPeriod()
+  {
+    configBuilder.withCoordinatorKillPeriod(Duration.standardHours(1));
+
+    setupUnusedSegments();
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstRun = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 2L)),
+        firstRun.getCoordinatorStats()
+    );
+
+    validateAndResetState(
+        DS1,
+        new Interval(
+            yearOldSegment.getInterval().getStart(),
+            monthOldSegment.getInterval().getEnd()
+        )
+    );
+
+    final DruidCoordinatorRuntimeParams secondRun = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 1, 10,  ImmutableMap.of(DS1, 2L)),
+        secondRun.getCoordinatorStats()
+    );
+    validateAndResetState(DS1, null);
+  }
+
+  /**
+   * Similar to {@link #testMultipleRuns()}
+   */
+  @Test
+  public void testMultipleDatasources()
+  {
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillMaxSegments(2);
+
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, MONTH_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, DAY_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_DAY, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, NOW.minusDays(1));
+
+    createAndAddUnusedSegment(DS2, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS2, DAY_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS2, NEXT_DAY, NOW.minusDays(1));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 2, 10, ImmutableMap.of(DS1, 2L, DS2, 2L)),
+        firstKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(YEAR_OLD.getStart(), 
MONTH_OLD.getEnd()));
+    validateAndResetState(DS2, new Interval(YEAR_OLD.getStart(), 
DAY_OLD.getEnd()));
+
+    final DruidCoordinatorRuntimeParams secondKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(20, 4, 20,  ImmutableMap.of(DS1, 4L, DS2, 3L)),
+        secondKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(DAY_OLD.getStart(), 
NEXT_DAY.getEnd()));
+    validateAndResetState(DS2, NEXT_DAY);
+
+    final DruidCoordinatorRuntimeParams thirdKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(30, 5, 30, ImmutableMap.of(DS1, 5L, DS2, 3L)),
+        thirdKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, NEXT_MONTH);
+    validateAndResetState(DS2, null);
+  }
+
+  /**
+   * Even though "more recent" segments are also considered as candidates in 
the wide kill interval here,
+   * the kill task will narrow down to clean up only the segments that 
strictly honor the max kill time.
+   * See {@code 
KillUnusedSegmentsTaskTest#testKillMultipleUnusedSegmentsWithNullMaxUsedStatusLastUpdatedTime}
+   * for example.
+   */
+  @Test
+  public void testSpreadOutLastMaxSegments()
+  {
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillBufferPeriod(Duration.standardDays(3));
+
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(10));
+    createAndAddUnusedSegment(DS1, MONTH_OLD, NOW.minusDays(10));
+    createAndAddUnusedSegment(DS1, DAY_OLD, NOW.minusDays(2));
+    createAndAddUnusedSegment(DS1, HOUR_OLD, NOW.minusDays(2));
+    createAndAddUnusedSegment(DS1, NEXT_DAY, NOW.minusDays(10));
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, NOW.minusDays(10));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 4L)),
+        firstKill.getCoordinatorStats()
+    );
+    validateAndResetState(DS1, new Interval(YEAR_OLD.getStart(), 
NEXT_MONTH.getEnd()));
+  }
+
+  @Test
+  public void testAddOldVersionsLater()
+  {
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillMaxSegments(2);
+
+    createAndAddUnusedSegment(DS1, DAY_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_DAY, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, NOW.minusDays(1));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 2L)),
+        firstKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(DAY_OLD.getStart(), 
NEXT_DAY.getEnd()));
+
+    log.info("Second run...");
+    // Add two old unused segments now. These only get killed much later on 
when the kill
+    // duty eventually round robins its way through until the latest time.
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, MONTH_OLD, NOW.minusDays(1));
+
+    final DruidCoordinatorRuntimeParams secondKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(20, 2, 20, ImmutableMap.of(DS1, 3L)),
+        secondKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, NEXT_MONTH);
+
+    log.info("Third run...");
+    final DruidCoordinatorRuntimeParams thirdKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(30, 2, 30, ImmutableMap.of(DS1, 3L)),
+        thirdKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, null);
+
+    log.info("Fourth run...");
+
+    final DruidCoordinatorRuntimeParams fourthKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(40, 3, 40, ImmutableMap.of(DS1, 5L)),
+        fourthKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(YEAR_OLD.getStart(), 
MONTH_OLD.getEnd()));
+  }
+
+  @Test
+  public void testNoDatasources()
+  {
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 0, 10, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, null);
+    validateAndResetState(DS2, null);
+  }
+
+  @Test
+  public void testWhiteList()
+  {
+    // Only segments more than a day old are killed
+    
dynamicConfigBuilder.withSpecificDataSourcesToKillUnusedSegmentsIn(Collections.singleton(DS2));
+    paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
+
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS2, YEAR_OLD, NOW.minusDays(1));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 0L, DS2, 1L)),
+        runParams.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS2, YEAR_OLD);
+    validateAndResetState(DS1, null);
   }
 
   @Test
   public void testDurationToRetain()
   {
     // Only segments more than a day old are killed
-    Interval expectedKillInterval = new Interval(
-        yearOldSegment.getInterval().getStart(),
-        dayOldSegment.getInterval().getEnd()
+    setupUnusedSegments();
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 2L)),
+        runParams.getCoordinatorStats()
+    );
+
+    validateAndResetState(
+        DS1,
+        new Interval(
+            yearOldSegment.getInterval().getStart(), 
monthOldSegment.getInterval().getEnd())
     );
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(expectedKillInterval);
-    verifyState(ImmutableMap.of(DATASOURCE, 
dayOldSegment.getInterval().getEnd()));
-    verifyStats(9, 1, 10);
   }
 
   @Test
   public void testNegativeDurationToRetain()
   {
-    // Duration to retain = -1 day, reinit target for config to take effect
-    Mockito.doReturn(DURATION_TO_RETAIN.negated())
-           .when(config).getCoordinatorKillDurationToRetain();
-    target = new KillUnusedSegments(
+    
configBuilder.withCoordinatorKillDurationToRetain(DURATION_TO_RETAIN.negated());
+
+    setupUnusedSegments();
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 5L)),
+        runParams.getCoordinatorStats()
     );
 
-    // Segments upto 1 day in the future are killed
-    Interval expectedKillInterval = new Interval(
-        yearOldSegment.getInterval().getStart(),
-        nextDaySegment.getInterval().getEnd()
+    validateAndResetState(
+        DS1,
+        new Interval(yearOldSegment.getInterval().getStart(), 
nextDaySegment.getInterval().getEnd())
     );
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(expectedKillInterval);
-    verifyState(ImmutableMap.of(DATASOURCE, 
nextDaySegment.getInterval().getEnd()));
-    verifyStats(9, 1, 10);
   }
 
   @Test
   public void testIgnoreDurationToRetain()
   {
-    Mockito.doReturn(true)
-           .when(config).getCoordinatorKillIgnoreDurationToRetain();
-    target = new KillUnusedSegments(
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+
+    setupUnusedSegments();
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 6L)),
+        runParams.getCoordinatorStats()
     );
 
-    // All future and past unused segments are killed
-    Interval expectedKillInterval = new Interval(
-        yearOldSegment.getInterval().getStart(),
-        nextMonthSegment.getInterval().getEnd()
+    // All past and future unused segments should be killed
+    validateAndResetState(
+        DS1,
+        new Interval(yearOldSegment.getInterval().getStart(), 
nextMonthSegment.getInterval().getEnd())
     );
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(expectedKillInterval);
-    verifyState(ImmutableMap.of(DATASOURCE, 
nextMonthSegment.getInterval().getEnd()));
-    verifyStats(9, 1, 10);
   }
 
   @Test
   public void testMaxSegmentsToKill()
   {
-    Mockito.doReturn(1)
-           .when(config).getCoordinatorKillMaxSegments();
-    target = new KillUnusedSegments(
+    configBuilder.withCoordinatorKillMaxSegments(1);
+
+    setupUnusedSegments();
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 1L)),
+        runParams.getCoordinatorStats()
     );
 
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    // Only 1 unused segment is killed
-    runAndVerifyKillInterval(yearOldSegment.getInterval());
-    verifyState(ImmutableMap.of(DATASOURCE, 
yearOldSegment.getInterval().getEnd()));
-    verifyStats(9, 1, 10);
+    validateAndResetState(
+        DS1,
+        yearOldSegment.getInterval()
+    );
   }
 
   @Test
   public void testMultipleRuns()
   {
-    Mockito.doReturn(true)
-        .when(config).getCoordinatorKillIgnoreDurationToRetain();
-    Mockito.doReturn(2)
-        .when(config).getCoordinatorKillMaxSegments();
-    target = new KillUnusedSegments(
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillMaxSegments(2);
+
+    setupUnusedSegments();
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        configBuilder.build()
+    );
+
+    log.info("First run...");
+    final DruidCoordinatorRuntimeParams firstRun = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 2L)),
+        firstRun.getCoordinatorStats()
+    );
+
+    validateAndResetState(
+        DS1,
+        new Interval(
+            yearOldSegment.getInterval().getStart(),
+            monthOldSegment.getInterval().getEnd()
+        )
     );
 
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(new Interval(
-        yearOldSegment.getInterval().getStart(),
-        monthOldSegment.getInterval().getEnd()
-    ));
-    verifyState(ImmutableMap.of(DATASOURCE, 
monthOldSegment.getInterval().getEnd()));
+    log.info("Second run...");
+    final DruidCoordinatorRuntimeParams secondRun = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(20, 2, 20, ImmutableMap.of(DS1, 4L)),
+        secondRun.getCoordinatorStats()
+    );
+
+    validateAndResetState(
+        DS1,
+        new Interval(
+            dayOldSegment.getInterval().getStart(),
+            hourOldSegment.getInterval().getEnd()
+        )
+    );
 
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(new Interval(
-        dayOldSegment.getInterval().getStart(),
-        hourOldSegment.getInterval().getEnd()
-    ));
-    verifyState(ImmutableMap.of(DATASOURCE, 
hourOldSegment.getInterval().getEnd()));
+    log.info("Third run...");
+    final DruidCoordinatorRuntimeParams thirdRun = 
killDuty.run(paramsBuilder.build());
 
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(new Interval(
-        nextDaySegment.getInterval().getStart(),
-        nextMonthSegment.getInterval().getEnd()
-    ));
-    verifyState(ImmutableMap.of(DATASOURCE, 
nextMonthSegment.getInterval().getEnd()));
-    verifyStats(9, 1, 10, 3);
+    validateStats(
+        new ExpectedStats(30, 3, 30, ImmutableMap.of(DS1, 6L)),
+        thirdRun.getCoordinatorStats()
+    );
+
+    validateAndResetState(
+        DS1,
+        new Interval(
+            nextDaySegment.getInterval().getStart(),
+            nextMonthSegment.getInterval().getEnd()
+        )
+    );
   }
 
   @Test
   public void testKillTaskSlotRatioNoAvailableTaskCapacityForKill()
   {
-    mockTaskSlotUsage(0.10, 10, 1, 5);
-    runAndVerifyNoKill();
-    verifyState(ImmutableMap.of());
-    verifyStats(0, 0, 0);
+    dynamicConfigBuilder.withKillTaskSlotRatio(0.10);
+    dynamicConfigBuilder.withMaxKillTaskSlots(10);
+    paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
+
+    overlordClient = new TestOverlordClient(1, 5);
+
+    overlordClient.addTask(DS1);
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(0, 0, 0, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
+    );
+    validateAndResetState(DS1, null);
   }
 
   @Test
   public void testMaxKillTaskSlotsNoAvailableTaskCapacityForKill()
   {
-    mockTaskSlotUsage(1.0, 3, 3, 10);
-    runAndVerifyNoKill();
-    verifyState(ImmutableMap.of());
-    verifyStats(0, 0, 3);
+    dynamicConfigBuilder.withKillTaskSlotRatio(1.0);
+    dynamicConfigBuilder.withMaxKillTaskSlots(3);
+    paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
+
+    overlordClient = new TestOverlordClient(3, 10);
+    overlordClient.addTask(DS1);
+    overlordClient.addTask(DS1);
+    overlordClient.addTask(DS1);
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(0, 0, 3, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
+    );
+    validateAndResetState(DS1, null);
   }
 
   @Test
-  public void testGetKillTaskCapacity()
+  public void testKillTaskSlotStats1()
   {
-    Assert.assertEquals(
-        10,
-        KillUnusedSegments.getKillTaskCapacity(10, 1.0, Integer.MAX_VALUE)
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
     );
+    dynamicConfigBuilder.withKillTaskSlotRatio(1.0);
+    dynamicConfigBuilder.withMaxKillTaskSlots(Integer.MAX_VALUE);
+    paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
 
-    Assert.assertEquals(
-        0,
-        KillUnusedSegments.getKillTaskCapacity(10, 0.0, Integer.MAX_VALUE)
-    );
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 0, 10, ImmutableMap.of()),
+        runParams.getCoordinatorStats());
+  }
 
-    Assert.assertEquals(
-        10,
-        KillUnusedSegments.getKillTaskCapacity(10, Double.POSITIVE_INFINITY, 
Integer.MAX_VALUE)
+  @Test
+  public void testKillTaskSlotStats2()
+  {
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
     );
+    dynamicConfigBuilder.withKillTaskSlotRatio(0.0);
+    dynamicConfigBuilder.withMaxKillTaskSlots(Integer.MAX_VALUE);
+    paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
 
-    Assert.assertEquals(
-        0,
-        KillUnusedSegments.getKillTaskCapacity(10, 1.0, 0)
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(0, 0, 0, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
     );
+  }
 
-    Assert.assertEquals(
-        1,
-        KillUnusedSegments.getKillTaskCapacity(10, 0.1, 3)
+  @Test
+  public void testKillTaskSlotStats3()
+  {
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
     );
+    dynamicConfigBuilder.withKillTaskSlotRatio(1.0);
+    dynamicConfigBuilder.withMaxKillTaskSlots(0);
+    paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
 
-    Assert.assertEquals(
-        2,
-        KillUnusedSegments.getKillTaskCapacity(10, 0.3, 2)
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(0, 0, 0, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
     );
   }
 
-  private void runAndVerifyKillInterval(Interval expectedKillInterval)
+  @Test
+  public void testKillTaskSlotStats4()
   {
-    int limit = config.getCoordinatorKillMaxSegments();
-    Mockito.doReturn(Futures.immediateFuture("ok"))
-           .when(overlordClient)
-           .runKillTask(
-               ArgumentMatchers.anyString(),
-               ArgumentMatchers.anyString(),
-               ArgumentMatchers.any(Interval.class),
-               ArgumentMatchers.anyInt(),
-               ArgumentMatchers.any(DateTime.class));
-    target.runInternal(params);
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+    dynamicConfigBuilder.withKillTaskSlotRatio(0.1);
+    dynamicConfigBuilder.withMaxKillTaskSlots(3);
+    paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
 
-    Mockito.verify(overlordClient, Mockito.times(1)).runKillTask(
-        ArgumentMatchers.anyString(),
-        ArgumentMatchers.eq(DATASOURCE),
-        ArgumentMatchers.eq(expectedKillInterval),
-        ArgumentMatchers.eq(limit),
-        ArgumentMatchers.any()
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(1, 0, 1, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
     );
   }
 
-  private void verifyStats(int availableSlots, int submittedTasks, int 
maxSlots)
+  @Test
+  public void testKillTaskSlotStats5()
   {
-    verifyStats(availableSlots, submittedTasks, maxSlots, 1);
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+    dynamicConfigBuilder.withKillTaskSlotRatio(0.3);
+    dynamicConfigBuilder.withMaxKillTaskSlots(2);
+    paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(2, 0, 2, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
+    );
   }
 
-  private void verifyStats(int availableSlots, int submittedTasks, int 
maxSlots, int times)
+  @Test
+  public void testKillFirstHalfEternitySegment()
   {
-    Mockito.verify(stats, 
Mockito.times(times)).add(Stats.Kill.AVAILABLE_SLOTS, availableSlots);
-    Mockito.verify(stats, 
Mockito.times(times)).add(Stats.Kill.SUBMITTED_TASKS, submittedTasks);
-    Mockito.verify(stats, Mockito.times(times)).add(Stats.Kill.MAX_SLOTS, 
maxSlots);
+    final DateTime sixtyDaysAgo = NOW.minusDays(60);
+
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+
+    final Interval firstHalfEternity = new Interval(DateTimes.MIN, 
DateTimes.of("2024"));
+    createAndAddUnusedSegment(DS1, firstHalfEternity, sixtyDaysAgo);
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstRun = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of()),
+        firstRun.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, firstHalfEternity);
   }
 
-  private void verifyState(Map<String, DateTime> 
expectedDatasourceToLastKillIntervalEnd)
+  /**
+   * Regardless of ignoreDurationToRetain configuration, auto-kill doesn't 
delete unused eternity segments because the
+   * unused segment retrieval code uses {@link 
DateTimes#COMPARE_DATE_AS_STRING_MAX} as the datetime string comparison
+   * for the end endpoint when retrieving unused segment intervals.
+   * TODO: link GitHub issue
+   */
+  @Ignore
+  @Test
+  public void testKillEternitySegment()
   {
-    Assert.assertEquals(expectedDatasourceToLastKillIntervalEnd, 
target.getDatasourceToLastKillIntervalEnd());
+    final DateTime sixtyDaysAgo = NOW.minusDays(60);
+
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+
+    createAndAddUnusedSegment(DS1, Intervals.ETERNITY, sixtyDaysAgo);
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstRun = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 1L)),
+        firstRun.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, Intervals.ETERNITY);
   }
 
-  private void runAndVerifyNoKill()
+  /**
+   * Similar to {@link #testKillEternitySegment()}
+   * <p>
+   * Regardless of ignoreDurationToRetain configuration, auto-kill doesn't 
delete unused segments with an interval end with
+   * {@link DateTimes#MAX}. This is because the kill duty uses {@link 
DateTimes#COMPARE_DATE_AS_STRING_MAX} as the
+   * datetime string comparison for the end endpoint when retrieving unused 
segment intervals.
+   * TODO: link GitHub issue
+   * </p>
+   */
+  @Ignore
+  @Test
+  public void testKillSecondHalfEternitySegments()
   {
-    target.run(params);
-    Mockito.verify(overlordClient, Mockito.never()).runKillTask(
-        ArgumentMatchers.anyString(),
-        ArgumentMatchers.anyString(),
-        ArgumentMatchers.any(Interval.class),
-        ArgumentMatchers.anyInt(),
-        ArgumentMatchers.any(DateTime.class)
+    final DateTime sixtyDaysAgo = NOW.minusDays(60);
+
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    final Interval secondHalfEternity = new Interval(DateTimes.of("1970"), 
DateTimes.MAX);
+
+    createAndAddUnusedSegment(DS1, secondHalfEternity, sixtyDaysAgo);
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstRun = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 1L)),
+        firstRun.getCoordinatorStats()
     );
+
+    validateAndResetState(DS1, secondHalfEternity);
   }
 
-  private void mockTaskSlotUsage(
-      double killTaskSlotRatio,
-      int maxKillTaskSlots,
-      int numPendingCoordKillTasks,
-      int maxWorkerCapacity
-  )
+  private void validateStats(final ExpectedStats expectedStats, final 
CoordinatorRunStats actualRunStats)
+  {
+    Assert.assertEquals(expectedStats.availableSlots, 
actualRunStats.get(Stats.Kill.AVAILABLE_SLOTS));
+    Assert.assertEquals(expectedStats.submittedTasks, 
actualRunStats.get(Stats.Kill.SUBMITTED_TASKS));
+    Assert.assertEquals(expectedStats.maxSlots, 
actualRunStats.get(Stats.Kill.MAX_SLOTS));
+
+    for (final Map.Entry<String, Long> expectedEntry : 
expectedStats.dataSourceToCandidateSegments.entrySet()) {
+      Assert.assertEquals(
+          expectedEntry.getKey(),
+          expectedEntry.getValue().longValue(),
+          actualRunStats.get(
+              Stats.Kill.CANDIDATE_SEGMENTS_KILLED,
+              RowKey.of(Dimension.DATASOURCE, expectedEntry.getKey())
+          )
+      );
+    }
+  }
+
+  private void validateAndResetState(final String dataSource, @Nullable final 
Interval expectedKillInterval)
   {
-    Mockito.doReturn(killTaskSlotRatio)
-        .when(coordinatorDynamicConfig).getKillTaskSlotRatio();
-    Mockito.doReturn(maxKillTaskSlots)
-        .when(coordinatorDynamicConfig).getMaxKillTaskSlots();
-    Mockito.doReturn(Futures.immediateFuture(new 
IndexingTotalWorkerCapacityInfo(1, maxWorkerCapacity)))
-        .when(overlordClient)
-        .getTotalWorkerCapacity();
-    List<TaskStatusPlus> runningCoordinatorIssuedKillTasks = new ArrayList<>();
-    for (int i = 0; i < numPendingCoordKillTasks; i++) {
-      runningCoordinatorIssuedKillTasks.add(new TaskStatusPlus(
-          KillUnusedSegments.TASK_ID_PREFIX + "_taskId_" + i,
-          "groupId_" + i,
-          KillUnusedSegments.KILL_TASK_TYPE,
-          DateTimes.EPOCH,
-          DateTimes.EPOCH,
-          TaskState.RUNNING,
-          RunnerTaskState.RUNNING,
-          -1L,
-          TaskLocation.unknown(),
-          "datasource",
-          null
-      ));
+    final Interval observedLastKillInterval = 
overlordClient.getLastKillInterval(dataSource);
+    final String observedLastKillTaskId = 
overlordClient.getLastKillTaskId(dataSource);
+
+    Assert.assertEquals(
+        expectedKillInterval,
+        observedLastKillInterval
+    );
+
+    String expectedKillTaskId = null;
+    if (expectedKillInterval != null) {
+      expectedKillTaskId = TestOverlordClient.getTaskId(
+          KillUnusedSegments.TASK_ID_PREFIX,
+          dataSource,
+          expectedKillInterval
+      );
     }
-    Mockito.doReturn(Futures.immediateFuture(
-            
CloseableIterators.withEmptyBaggage(runningCoordinatorIssuedKillTasks.iterator())))
-        .when(overlordClient)
-        .taskStatuses(null, null, 0);
+
+    Assert.assertEquals(
+        expectedKillTaskId,
+        observedLastKillTaskId
+    );

Review Comment:
   Can be in a single line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to