abhishekrb19 commented on code in PR #15941:
URL: https://github.com/apache/druid/pull/15941#discussion_r1500196710


##########
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java:
##########
@@ -96,362 +104,805 @@ public class KillUnusedSegmentsTest
   private DataSegment nextDaySegment;
   private DataSegment nextMonthSegment;
 
-  private KillUnusedSegments target;
 
   @Before
   public void setup()
   {
-    
Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig();
-    Mockito.doReturn(stats).when(params).getCoordinatorStats();
-    
Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod();
-    
Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain();
-    
Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod();
-    
Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments();
-    
Mockito.doReturn(Duration.parse("PT3154000000S")).when(config).getCoordinatorKillBufferPeriod();
+    segmentsMetadataManager = new TestSegmentsMetadataManager();
+    overlordClient = new TestOverlordClient();
 
-    Mockito.doReturn(Collections.singleton(DATASOURCE))
-           
.when(coordinatorDynamicConfig).getSpecificDataSourcesToKillUnusedSegmentsIn();
+    // These two can definitely be part of setup()
+    configBuilder = new TestDruidCoordinatorConfig.Builder()
+        .withCoordinatorIndexingPeriod(INDEXING_PERIOD)
+        .withCoordinatorKillPeriod(COORDINATOR_KILL_PERIOD)
+        .withCoordinatorKillDurationToRetain(DURATION_TO_RETAIN)
+        .withCoordinatorKillMaxSegments(MAX_SEGMENTS_TO_KILL)
+        .withCoordinatorKillBufferPeriod(BUFFER_PERIOD);
+    paramsBuilder = 
DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc());
+  }
 
-    final DateTime now = DateTimes.nowUtc();
+  /**
+   * The buffer periood and duration to retain influence kill behavior.
+   */
+  @Test
+  public void testDefaults()
+  {
+    final DateTime sixtyDaysAgo = NOW.minusDays(60);
 
-    yearOldSegment = createSegmentWithEnd(now.minusDays(365));
-    monthOldSegment = createSegmentWithEnd(now.minusDays(30));
-    dayOldSegment = createSegmentWithEnd(now.minusDays(1));
-    hourOldSegment = createSegmentWithEnd(now.minusHours(1));
-    nextDaySegment = createSegmentWithEnd(now.plusDays(1));
-    nextMonthSegment = createSegmentWithEnd(now.plusDays(30));
-
-    final List<DataSegment> unusedSegments = ImmutableList.of(
-        yearOldSegment,
-        monthOldSegment,
-        dayOldSegment,
-        hourOldSegment,
-        nextDaySegment,
-        nextMonthSegment
-    );
-
-    Mockito.when(
-        segmentsMetadataManager.getUnusedSegmentIntervals(
-            ArgumentMatchers.anyString(),
-            ArgumentMatchers.any(),
-            ArgumentMatchers.any(),
-            ArgumentMatchers.anyInt(),
-            ArgumentMatchers.any()
-        )
-    ).thenAnswer(invocation -> {
-      DateTime minStartTime = invocation.getArgument(1);
-      DateTime maxEndTime = invocation.getArgument(2);
-      long maxEndMillis = maxEndTime.getMillis();
-      Long minStartMillis = minStartTime != null ? minStartTime.getMillis() : 
null;
-      List<Interval> unusedIntervals =
-          unusedSegments.stream()
-                        .map(DataSegment::getInterval)
-                        .filter(i -> i.getEnd().getMillis() <= maxEndMillis
-                                     && (null == minStartMillis || 
i.getStart().getMillis() >= minStartMillis))
-                        .collect(Collectors.toList());
-
-      int limit = invocation.getArgument(3);
-      return unusedIntervals.size() <= limit ? unusedIntervals : 
unusedIntervals.subList(0, limit);
-    });
-
-    target = new KillUnusedSegments(
+    createAndAddUnusedSegment(DS1, YEAR_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, MONTH_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, DAY_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, HOUR_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, NEXT_DAY, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, Intervals.ETERNITY, sixtyDaysAgo);
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        new TestDruidCoordinatorConfig.Builder().build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstRun = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 1L)),
+        firstRun.getCoordinatorStats()
     );
+
+    validateAndResetState(DS1, YEAR_OLD);
   }
 
   @Test
   public void testRunWithNoIntervalShouldNotKillAnySegments()
   {
-    
Mockito.doReturn(null).when(segmentsMetadataManager).getUnusedSegmentIntervals(
-        ArgumentMatchers.anyString(),
-        ArgumentMatchers.any(),
-        ArgumentMatchers.any(),
-        ArgumentMatchers.anyInt(),
-        ArgumentMatchers.any()
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 0, 10, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
     );
 
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    target.run(params);
-    Mockito.verify(overlordClient, Mockito.never())
-           .runKillTask(anyString(), anyString(), any(Interval.class), 
anyInt(), any(DateTime.class));
+    validateAndResetState(DS1, null);
+    validateAndResetState(DS2, null);
   }
 
   @Test
   public void 
testRunWithSpecificDatasourceAndNoIntervalShouldNotKillAnySegments()
   {
-    Mockito.doReturn(Duration.standardDays(400))
-           .when(config).getCoordinatorKillDurationToRetain();
-    target = new KillUnusedSegments(
+    
configBuilder.withCoordinatorKillDurationToRetain(Duration.standardDays(400));
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        configBuilder.build()
     );
 
-    // No unused segment is older than the retention period
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    target.run(params);
-    Mockito.verify(overlordClient, Mockito.never())
-           .runKillTask(anyString(), anyString(), any(Interval.class), 
anyInt(), any(DateTime.class));
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 0, 10, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, null);
+    validateAndResetState(DS2, null);
+  }
+
+  /**
+   * The kill period is honored after the first indexing run.
+   */
+  @Test
+  public void testRunsWithKillPeriod()
+  {
+    configBuilder.withCoordinatorKillPeriod(Duration.standardHours(1));
+
+    setupUnusedSegments();
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstRun = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 2L)),
+        firstRun.getCoordinatorStats()
+    );
+
+    validateAndResetState(
+        DS1,
+        new Interval(
+            yearOldSegment.getInterval().getStart(),
+            monthOldSegment.getInterval().getEnd()
+        )
+    );
+
+    final DruidCoordinatorRuntimeParams secondRun = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 1, 10,  ImmutableMap.of(DS1, 2L)),
+        secondRun.getCoordinatorStats()
+    );
+    validateAndResetState(DS1, null);
+  }
+
+  /**
+   * Similar to {@link #testMultipleRuns()}
+   */
+  @Test
+  public void testMultipleDatasources()
+  {
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillMaxSegments(2);
+
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, MONTH_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, DAY_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_DAY, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, NOW.minusDays(1));
+
+    createAndAddUnusedSegment(DS2, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS2, DAY_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS2, NEXT_DAY, NOW.minusDays(1));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 2, 10, ImmutableMap.of(DS1, 2L, DS2, 2L)),
+        firstKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(YEAR_OLD.getStart(), 
MONTH_OLD.getEnd()));
+    validateAndResetState(DS2, new Interval(YEAR_OLD.getStart(), 
DAY_OLD.getEnd()));
+
+    final DruidCoordinatorRuntimeParams secondKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(20, 4, 20,  ImmutableMap.of(DS1, 4L, DS2, 3L)),
+        secondKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(DAY_OLD.getStart(), 
NEXT_DAY.getEnd()));
+    validateAndResetState(DS2, NEXT_DAY);
+
+    final DruidCoordinatorRuntimeParams thirdKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(30, 5, 30, ImmutableMap.of(DS1, 5L, DS2, 3L)),
+        thirdKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, NEXT_MONTH);
+    validateAndResetState(DS2, null);
+  }
+
+  /**
+   * Even though "more recent" segments are also considered as candidates in 
the wide kill interval here,
+   * the kill task will narrow down to clean up only the segments that 
strictly honor the max kill time.
+   * See {@code 
KillUnusedSegmentsTaskTest#testKillMultipleUnusedSegmentsWithNullMaxUsedStatusLastUpdatedTime}
+   * for example.
+   */
+  @Test
+  public void testSpreadOutLastMaxSegments()
+  {
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillBufferPeriod(Duration.standardDays(3));
+
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(10));
+    createAndAddUnusedSegment(DS1, MONTH_OLD, NOW.minusDays(10));
+    createAndAddUnusedSegment(DS1, DAY_OLD, NOW.minusDays(2));
+    createAndAddUnusedSegment(DS1, HOUR_OLD, NOW.minusDays(2));
+    createAndAddUnusedSegment(DS1, NEXT_DAY, NOW.minusDays(10));
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, NOW.minusDays(10));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 4L)),
+        firstKill.getCoordinatorStats()
+    );
+    validateAndResetState(DS1, new Interval(YEAR_OLD.getStart(), 
NEXT_MONTH.getEnd()));
+  }
+
+  @Test
+  public void testAddOldVersionsLater()
+  {
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillMaxSegments(2);
+
+    createAndAddUnusedSegment(DS1, DAY_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_DAY, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, NOW.minusDays(1));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 2L)),
+        firstKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(DAY_OLD.getStart(), 
NEXT_DAY.getEnd()));
+
+    log.info("Second run...");
+    // Add two old unused segments now. These only get killed much later on 
when the kill
+    // duty eventually round robins its way through until the latest time.
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, MONTH_OLD, NOW.minusDays(1));
+
+    final DruidCoordinatorRuntimeParams secondKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(20, 2, 20, ImmutableMap.of(DS1, 3L)),
+        secondKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, NEXT_MONTH);
+
+    log.info("Third run...");
+    final DruidCoordinatorRuntimeParams thirdKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(30, 2, 30, ImmutableMap.of(DS1, 3L)),
+        thirdKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, null);
+
+    log.info("Fourth run...");
+
+    final DruidCoordinatorRuntimeParams fourthKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(40, 3, 40, ImmutableMap.of(DS1, 5L)),
+        fourthKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(YEAR_OLD.getStart(), 
MONTH_OLD.getEnd()));
+  }
+
+  @Test
+  public void testNoDatasources()
+  {
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 0, 10, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, null);
+    validateAndResetState(DS2, null);
+  }
+
+  @Test
+  public void testWhiteList()
+  {
+    // Only segments more than a day old are killed
+    
dynamicConfigBuilder.withSpecificDataSourcesToKillUnusedSegmentsIn(Collections.singleton(DS2));
+    paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
+
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS2, YEAR_OLD, NOW.minusDays(1));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 0L, DS2, 1L)),
+        runParams.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS2, YEAR_OLD);
+    validateAndResetState(DS1, null);
   }
 
   @Test
   public void testDurationToRetain()
   {
     // Only segments more than a day old are killed
-    Interval expectedKillInterval = new Interval(
-        yearOldSegment.getInterval().getStart(),
-        dayOldSegment.getInterval().getEnd()
+    setupUnusedSegments();
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 2L)),
+        runParams.getCoordinatorStats()
+    );
+
+    validateAndResetState(
+        DS1,
+        new Interval(
+            yearOldSegment.getInterval().getStart(), 
monthOldSegment.getInterval().getEnd())
     );
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(expectedKillInterval);
-    verifyState(ImmutableMap.of(DATASOURCE, 
dayOldSegment.getInterval().getEnd()));
-    verifyStats(9, 1, 10);
   }
 
   @Test
   public void testNegativeDurationToRetain()
   {
-    // Duration to retain = -1 day, reinit target for config to take effect
-    Mockito.doReturn(DURATION_TO_RETAIN.negated())
-           .when(config).getCoordinatorKillDurationToRetain();
-    target = new KillUnusedSegments(
+    
configBuilder.withCoordinatorKillDurationToRetain(DURATION_TO_RETAIN.negated());
+
+    setupUnusedSegments();
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 5L)),
+        runParams.getCoordinatorStats()
     );
 
-    // Segments upto 1 day in the future are killed
-    Interval expectedKillInterval = new Interval(
-        yearOldSegment.getInterval().getStart(),
-        nextDaySegment.getInterval().getEnd()
+    validateAndResetState(
+        DS1,
+        new Interval(yearOldSegment.getInterval().getStart(), 
nextDaySegment.getInterval().getEnd())
     );
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(expectedKillInterval);
-    verifyState(ImmutableMap.of(DATASOURCE, 
nextDaySegment.getInterval().getEnd()));
-    verifyStats(9, 1, 10);
   }
 
   @Test
   public void testIgnoreDurationToRetain()
   {
-    Mockito.doReturn(true)
-           .when(config).getCoordinatorKillIgnoreDurationToRetain();
-    target = new KillUnusedSegments(
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+
+    setupUnusedSegments();
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 6L)),
+        runParams.getCoordinatorStats()
     );
 
-    // All future and past unused segments are killed
-    Interval expectedKillInterval = new Interval(
-        yearOldSegment.getInterval().getStart(),
-        nextMonthSegment.getInterval().getEnd()
+    // All past and future unused segments should be killed
+    validateAndResetState(
+        DS1,
+        new Interval(yearOldSegment.getInterval().getStart(), 
nextMonthSegment.getInterval().getEnd())
     );
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(expectedKillInterval);
-    verifyState(ImmutableMap.of(DATASOURCE, 
nextMonthSegment.getInterval().getEnd()));
-    verifyStats(9, 1, 10);
   }
 
   @Test
   public void testMaxSegmentsToKill()
   {
-    Mockito.doReturn(1)
-           .when(config).getCoordinatorKillMaxSegments();
-    target = new KillUnusedSegments(
+    configBuilder.withCoordinatorKillMaxSegments(1);
+
+    setupUnusedSegments();
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 1L)),
+        runParams.getCoordinatorStats()
     );
 
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    // Only 1 unused segment is killed
-    runAndVerifyKillInterval(yearOldSegment.getInterval());
-    verifyState(ImmutableMap.of(DATASOURCE, 
yearOldSegment.getInterval().getEnd()));
-    verifyStats(9, 1, 10);
+    validateAndResetState(
+        DS1,
+        yearOldSegment.getInterval()
+    );
   }
 
   @Test
   public void testMultipleRuns()
   {
-    Mockito.doReturn(true)
-        .when(config).getCoordinatorKillIgnoreDurationToRetain();
-    Mockito.doReturn(2)
-        .when(config).getCoordinatorKillMaxSegments();
-    target = new KillUnusedSegments(
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillMaxSegments(2);
+
+    setupUnusedSegments();
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        configBuilder.build()
+    );
+
+    log.info("First run...");
+    final DruidCoordinatorRuntimeParams firstRun = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 2L)),
+        firstRun.getCoordinatorStats()
+    );
+
+    validateAndResetState(
+        DS1,
+        new Interval(
+            yearOldSegment.getInterval().getStart(),
+            monthOldSegment.getInterval().getEnd()
+        )
     );
 
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(new Interval(
-        yearOldSegment.getInterval().getStart(),
-        monthOldSegment.getInterval().getEnd()
-    ));
-    verifyState(ImmutableMap.of(DATASOURCE, 
monthOldSegment.getInterval().getEnd()));
+    log.info("Second run...");
+    final DruidCoordinatorRuntimeParams secondRun = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(20, 2, 20, ImmutableMap.of(DS1, 4L)),
+        secondRun.getCoordinatorStats()
+    );
+
+    validateAndResetState(
+        DS1,
+        new Interval(
+            dayOldSegment.getInterval().getStart(),
+            hourOldSegment.getInterval().getEnd()
+        )
+    );
 
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(new Interval(
-        dayOldSegment.getInterval().getStart(),
-        hourOldSegment.getInterval().getEnd()
-    ));
-    verifyState(ImmutableMap.of(DATASOURCE, 
hourOldSegment.getInterval().getEnd()));
+    log.info("Third run...");
+    final DruidCoordinatorRuntimeParams thirdRun = 
killDuty.run(paramsBuilder.build());
 
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(new Interval(
-        nextDaySegment.getInterval().getStart(),
-        nextMonthSegment.getInterval().getEnd()
-    ));
-    verifyState(ImmutableMap.of(DATASOURCE, 
nextMonthSegment.getInterval().getEnd()));
-    verifyStats(9, 1, 10, 3);
+    validateStats(
+        new ExpectedStats(30, 3, 30, ImmutableMap.of(DS1, 6L)),
+        thirdRun.getCoordinatorStats()
+    );
+
+    validateAndResetState(
+        DS1,
+        new Interval(
+            nextDaySegment.getInterval().getStart(),
+            nextMonthSegment.getInterval().getEnd()
+        )
+    );
   }
 
   @Test
   public void testKillTaskSlotRatioNoAvailableTaskCapacityForKill()
   {
-    mockTaskSlotUsage(0.10, 10, 1, 5);
-    runAndVerifyNoKill();
-    verifyState(ImmutableMap.of());
-    verifyStats(0, 0, 0);
+    dynamicConfigBuilder.withKillTaskSlotRatio(0.10);
+    dynamicConfigBuilder.withMaxKillTaskSlots(10);
+    paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
+
+    overlordClient = new TestOverlordClient(1, 5);
+
+    overlordClient.addTask(DS1);
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(0, 0, 0, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
+    );
+    validateAndResetState(DS1, null);
   }
 
   @Test
   public void testMaxKillTaskSlotsNoAvailableTaskCapacityForKill()
   {
-    mockTaskSlotUsage(1.0, 3, 3, 10);
-    runAndVerifyNoKill();
-    verifyState(ImmutableMap.of());
-    verifyStats(0, 0, 3);
+    dynamicConfigBuilder.withKillTaskSlotRatio(1.0);
+    dynamicConfigBuilder.withMaxKillTaskSlots(3);
+    paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
+
+    overlordClient = new TestOverlordClient(3, 10);
+    overlordClient.addTask(DS1);
+    overlordClient.addTask(DS1);
+    overlordClient.addTask(DS1);
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(0, 0, 3, ImmutableMap.of()),

Review Comment:
   Yes, definitely, it reads more clearly now!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to