kfaraz commented on code in PR #15941:
URL: https://github.com/apache/druid/pull/15941#discussion_r1499331990


##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java:
##########
@@ -34,33 +36,69 @@
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
 
 public class TestSegmentsMetadataManager implements SegmentsMetadataManager
 {
+  private static final Logger log = new 
Logger(TestSegmentsMetadataManager.class);
+
+  /**
+   * segments = usedSegments + unusedSegments. Tracking unusedSegments 
separately for convenience and
+   * additional metadata.
+   */
   private final ConcurrentMap<String, DataSegment> segments = new 
ConcurrentHashMap<>();

Review Comment:
   Nit: The comment above doesn't really seem to add a lot of value. You could 
just call this field `allSegments` and be done with it.



##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java:
##########
@@ -34,33 +36,69 @@
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
 
 public class TestSegmentsMetadataManager implements SegmentsMetadataManager
 {
+  private static final Logger log = new 
Logger(TestSegmentsMetadataManager.class);
+
+  /**
+   * segments = usedSegments + unusedSegments. Tracking unusedSegments 
separately for convenience and
+   * additional metadata.
+   */
   private final ConcurrentMap<String, DataSegment> segments = new 
ConcurrentHashMap<>();
   private final ConcurrentMap<String, DataSegment> usedSegments = new 
ConcurrentHashMap<>();
+  private final ConcurrentMap<String, DataSegmentPlus> unusedSegments = new 
ConcurrentHashMap<>();
 
   private volatile DataSourcesSnapshot snapshot;
 
-  public void addSegment(DataSegment segment)
+  /**
+   * Adds the used segment.

Review Comment:
   Doesn't add any new info.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -282,41 +300,16 @@ private Interval findIntervalForKill(String dataSource, 
DateTime maxUsedStatusLa
 
   private int getAvailableKillTaskSlots(int killTaskCapacity, int 
numActiveKillTasks)
   {
-    return Math.max(
-        0,
-        killTaskCapacity - numActiveKillTasks
-    );
+    return Math.max(0, killTaskCapacity - numActiveKillTasks);
   }
 
   private boolean canDutyRun()
   {
     return lastKillTime == null || 
!DateTimes.nowUtc().isBefore(lastKillTime.plus(period));
   }
 
-  @VisibleForTesting
-  static int getKillTaskCapacity(int totalWorkerCapacity, double 
killTaskSlotRatio, int maxKillTaskSlots)
+  private static int getKillTaskCapacity(int totalWorkerCapacity, double 
killTaskSlotRatio, int maxKillTaskSlots)

Review Comment:
   Nit: If this is not visible anymore, does it still need to be static?
   
   This method can be simplified by passing only `CoordinatorDynamicConfig` to 
it.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -146,130 +154,140 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
     return runInternal(params);
   }
 
-  @VisibleForTesting
-  DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams 
params)
+  private DruidCoordinatorRuntimeParams runInternal(final 
DruidCoordinatorRuntimeParams params)
   {
-    TaskStats taskStats = new TaskStats();
     Collection<String> dataSourcesToKill =
         
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
-    double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
-    int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
-    int killTaskCapacity = getKillTaskCapacity(
+    final double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
+    final int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
+    final int killTaskCapacity = getKillTaskCapacity(
         CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient),
         killTaskSlotRatio,
         maxKillTaskSlots
     );
-    int availableKillTaskSlots = getAvailableKillTaskSlots(
+    final int availableKillTaskSlots = getAvailableKillTaskSlots(
         killTaskCapacity,
         CoordinatorDutyUtils.getNumActiveTaskSlots(overlordClient, 
IS_AUTO_KILL_TASK).size()
     );
     final CoordinatorRunStats stats = params.getCoordinatorStats();
 
-    taskStats.availableTaskSlots = availableKillTaskSlots;
-    taskStats.maxSlots = killTaskCapacity;
-
     if (0 < availableKillTaskSlots) {
       // If no datasource has been specified, all are eligible for killing 
unused segments
       if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
         dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
       }
 
-      log.debug("Killing unused segments for datasources[%s]", 
dataSourcesToKill);
       lastKillTime = DateTimes.nowUtc();
-      taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, 
availableKillTaskSlots);
+      killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
     }
 
     // any datasources that are no longer being considered for kill should 
have their
     // last kill interval removed from map.
     datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill);
-    addStats(taskStats, stats);
+    
+    stats.add(Stats.Kill.AVAILABLE_SLOTS, availableKillTaskSlots);
+    stats.add(Stats.Kill.MAX_SLOTS, killTaskCapacity);
     return params;
   }
 
-  private void addStats(
-      TaskStats taskStats,
-      CoordinatorRunStats stats
-  )
-  {
-    stats.add(Stats.Kill.AVAILABLE_SLOTS, taskStats.availableTaskSlots);
-    stats.add(Stats.Kill.SUBMITTED_TASKS, taskStats.submittedTasks);
-    stats.add(Stats.Kill.MAX_SLOTS, taskStats.maxSlots);
-  }
-
-  private int killUnusedSegments(
-      Collection<String> dataSourcesToKill,
-      int availableKillTaskSlots
+  /**
+   * Spawn kill tasks for each datasource to be killed upto {@code 
availableKillTaskSlots}.
+   * @param dataSourcesToKill finalized set of datasources to kill
+   * @param availableKillTaskSlots number of available kill task slots

Review Comment:
   Not adding any new info. Not really needed as this is just a private method.



##########
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java:
##########
@@ -96,362 +104,805 @@ public class KillUnusedSegmentsTest
   private DataSegment nextDaySegment;
   private DataSegment nextMonthSegment;
 
-  private KillUnusedSegments target;
 
   @Before
   public void setup()
   {
-    
Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig();
-    Mockito.doReturn(stats).when(params).getCoordinatorStats();
-    
Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod();
-    
Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain();
-    
Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod();
-    
Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments();
-    
Mockito.doReturn(Duration.parse("PT3154000000S")).when(config).getCoordinatorKillBufferPeriod();
+    segmentsMetadataManager = new TestSegmentsMetadataManager();
+    overlordClient = new TestOverlordClient();
 
-    Mockito.doReturn(Collections.singleton(DATASOURCE))
-           
.when(coordinatorDynamicConfig).getSpecificDataSourcesToKillUnusedSegmentsIn();
+    // These two can definitely be part of setup()

Review Comment:
   Please remove this.



##########
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java:
##########
@@ -44,50 +53,49 @@
 import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Answers;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnitRunner;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Set;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyString;
 
 /**
- *
+ * Unit tests to test the {@link KillUnusedSegments} duty.
  */
-@RunWith(MockitoJUnitRunner.class)
 public class KillUnusedSegmentsTest
 {
-  private static final Duration INDEXING_PERIOD = Duration.standardSeconds(1);
-  private static final Duration COORDINATOR_KILL_PERIOD = 
Duration.standardSeconds(1);
-  private static final Duration DURATION_TO_RETAIN = Duration.standardDays(1);
+  private static final Logger log = new Logger(KillUnusedSegmentsTest.class);
+  private static final Duration INDEXING_PERIOD = Duration.standardSeconds(0);
+  private static final Duration COORDINATOR_KILL_PERIOD = 
Duration.standardSeconds(0);
+  private static final Duration DURATION_TO_RETAIN = 
Duration.standardHours(36);
+  private static final Duration BUFFER_PERIOD = Duration.standardSeconds(1);
   private static final int MAX_SEGMENTS_TO_KILL = 10;
-  private static final String DATASOURCE = "DS1";
-
-  @Mock
-  private SegmentsMetadataManager segmentsMetadataManager;
-  @Mock
-  private OverlordClient overlordClient;
-  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
-  private DruidCoordinatorConfig config;
-
-  @Mock
-  private CoordinatorRunStats stats;
-  @Mock
-  private DruidCoordinatorRuntimeParams params;
-  @Mock
-  private CoordinatorDynamicConfig coordinatorDynamicConfig;
+  private static final String DS1 = "DS1";
+  private static final String DS2 = "DS2";
+
+  private TestSegmentsMetadataManager segmentsMetadataManager;
+
+  private TestOverlordClient overlordClient;
+
+  private TestDruidCoordinatorConfig.Builder configBuilder;
+
+  private DruidCoordinatorRuntimeParams.Builder paramsBuilder;
+
+  private final CoordinatorDynamicConfig.Builder dynamicConfigBuilder = 
CoordinatorDynamicConfig.builder();
+
+  private static final DateTime NOW = DateTimes.nowUtc();
+  private static final Interval YEAR_OLD = new Interval(Period.days(1), 
NOW.minusDays(365));

Review Comment:
   More apt to call these `YEAR_TILL_DATE`, `MONTH_TILL_DATE` or something. 
`YEAR_OLD` is ambiguous. Also, all the periods are currently set to a day.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -146,130 +154,140 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
     return runInternal(params);
   }
 
-  @VisibleForTesting

Review Comment:
   Thanks for getting rid of this!



##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java:
##########
@@ -34,33 +36,69 @@
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
 
 public class TestSegmentsMetadataManager implements SegmentsMetadataManager
 {
+  private static final Logger log = new 
Logger(TestSegmentsMetadataManager.class);
+
+  /**
+   * segments = usedSegments + unusedSegments. Tracking unusedSegments 
separately for convenience and
+   * additional metadata.
+   */
   private final ConcurrentMap<String, DataSegment> segments = new 
ConcurrentHashMap<>();
   private final ConcurrentMap<String, DataSegment> usedSegments = new 
ConcurrentHashMap<>();
+  private final ConcurrentMap<String, DataSegmentPlus> unusedSegments = new 
ConcurrentHashMap<>();
 
   private volatile DataSourcesSnapshot snapshot;
 
-  public void addSegment(DataSegment segment)
+  /**
+   * Adds the used segment.
+   */
+  public void addSegment(DataSegment segment, boolean isUsed)
   {
     segments.put(segment.getId().toString(), segment);
     usedSegments.put(segment.getId().toString(), segment);
     snapshot = null;
   }
 
+  /**
+   * Remove the used segment.
+   */
   public void removeSegment(DataSegment segment)
   {
     segments.remove(segment.getId().toString());
     usedSegments.remove(segment.getId().toString());
     snapshot = null;
   }
 
+  /**
+   * Add the unused segment with metadata.

Review Comment:
   I don't think any of these comments are really needed. The methods are 
fairly self-explanatory.



##########
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java:
##########
@@ -96,362 +104,805 @@ public class KillUnusedSegmentsTest
   private DataSegment nextDaySegment;
   private DataSegment nextMonthSegment;
 
-  private KillUnusedSegments target;
 
   @Before
   public void setup()
   {
-    
Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig();
-    Mockito.doReturn(stats).when(params).getCoordinatorStats();
-    
Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod();
-    
Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain();
-    
Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod();
-    
Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments();
-    
Mockito.doReturn(Duration.parse("PT3154000000S")).when(config).getCoordinatorKillBufferPeriod();
+    segmentsMetadataManager = new TestSegmentsMetadataManager();
+    overlordClient = new TestOverlordClient();
 
-    Mockito.doReturn(Collections.singleton(DATASOURCE))
-           
.when(coordinatorDynamicConfig).getSpecificDataSourcesToKillUnusedSegmentsIn();
+    // These two can definitely be part of setup()
+    configBuilder = new TestDruidCoordinatorConfig.Builder()
+        .withCoordinatorIndexingPeriod(INDEXING_PERIOD)
+        .withCoordinatorKillPeriod(COORDINATOR_KILL_PERIOD)
+        .withCoordinatorKillDurationToRetain(DURATION_TO_RETAIN)
+        .withCoordinatorKillMaxSegments(MAX_SEGMENTS_TO_KILL)
+        .withCoordinatorKillBufferPeriod(BUFFER_PERIOD);
+    paramsBuilder = 
DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc());
+  }
 
-    final DateTime now = DateTimes.nowUtc();
+  /**
+   * The buffer periood and duration to retain influence kill behavior.
+   */
+  @Test
+  public void testDefaults()
+  {
+    final DateTime sixtyDaysAgo = NOW.minusDays(60);
 
-    yearOldSegment = createSegmentWithEnd(now.minusDays(365));

Review Comment:
   Just calling this out in case it got missed in the refactor.
   The original segments had their _end_ a year ago from now, a month ago from 
now and so on.
   The new intervals `YEAR_OLD`, `MONTH_OLD` actually have their end at now.
   Please ensure that the tests have been updated to incorporate that.



##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java:
##########
@@ -34,33 +36,69 @@
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
 
 public class TestSegmentsMetadataManager implements SegmentsMetadataManager
 {
+  private static final Logger log = new 
Logger(TestSegmentsMetadataManager.class);
+
+  /**
+   * segments = usedSegments + unusedSegments. Tracking unusedSegments 
separately for convenience and
+   * additional metadata.
+   */
   private final ConcurrentMap<String, DataSegment> segments = new 
ConcurrentHashMap<>();
   private final ConcurrentMap<String, DataSegment> usedSegments = new 
ConcurrentHashMap<>();
+  private final ConcurrentMap<String, DataSegmentPlus> unusedSegments = new 
ConcurrentHashMap<>();
 
   private volatile DataSourcesSnapshot snapshot;
 
-  public void addSegment(DataSegment segment)
+  /**
+   * Adds the used segment.
+   */
+  public void addSegment(DataSegment segment, boolean isUsed)

Review Comment:
   Where is the boolean `isUsed` used?



##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java:
##########
@@ -221,7 +265,28 @@ public List<Interval> getUnusedSegmentIntervals(
       final DateTime maxUsedStatusLastUpdatedTime
   )
   {
-    return null;
+    final List<DataSegmentPlus> sortedDataSegments = new 
ArrayList<>(unusedSegments.values());
+    sortedDataSegments.sort(
+        Comparator.comparingLong(dataSegmentPlus -> 
dataSegmentPlus.getDataSegment().getInterval().getStartMillis()
+        )

Review Comment:
   Please use consistent styles:
   ```suggestion
           Comparator.comparingLong(
               dataSegmentPlus -> 
dataSegmentPlus.getDataSegment().getInterval().getStartMillis()
           )
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -146,130 +154,140 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
     return runInternal(params);
   }
 
-  @VisibleForTesting
-  DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams 
params)
+  private DruidCoordinatorRuntimeParams runInternal(final 
DruidCoordinatorRuntimeParams params)
   {
-    TaskStats taskStats = new TaskStats();
     Collection<String> dataSourcesToKill =
         
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
-    double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
-    int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
-    int killTaskCapacity = getKillTaskCapacity(
+    final double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
+    final int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
+    final int killTaskCapacity = getKillTaskCapacity(
         CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient),
         killTaskSlotRatio,
         maxKillTaskSlots
     );
-    int availableKillTaskSlots = getAvailableKillTaskSlots(
+    final int availableKillTaskSlots = getAvailableKillTaskSlots(
         killTaskCapacity,
         CoordinatorDutyUtils.getNumActiveTaskSlots(overlordClient, 
IS_AUTO_KILL_TASK).size()
     );
     final CoordinatorRunStats stats = params.getCoordinatorStats();
 
-    taskStats.availableTaskSlots = availableKillTaskSlots;
-    taskStats.maxSlots = killTaskCapacity;
-
     if (0 < availableKillTaskSlots) {
       // If no datasource has been specified, all are eligible for killing 
unused segments
       if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
         dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
       }
 
-      log.debug("Killing unused segments for datasources[%s]", 
dataSourcesToKill);
       lastKillTime = DateTimes.nowUtc();
-      taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, 
availableKillTaskSlots);
+      killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
     }
 
     // any datasources that are no longer being considered for kill should 
have their
     // last kill interval removed from map.
     datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill);
-    addStats(taskStats, stats);
+    
+    stats.add(Stats.Kill.AVAILABLE_SLOTS, availableKillTaskSlots);
+    stats.add(Stats.Kill.MAX_SLOTS, killTaskCapacity);
     return params;
   }
 
-  private void addStats(
-      TaskStats taskStats,
-      CoordinatorRunStats stats
-  )
-  {
-    stats.add(Stats.Kill.AVAILABLE_SLOTS, taskStats.availableTaskSlots);
-    stats.add(Stats.Kill.SUBMITTED_TASKS, taskStats.submittedTasks);
-    stats.add(Stats.Kill.MAX_SLOTS, taskStats.maxSlots);
-  }
-
-  private int killUnusedSegments(
-      Collection<String> dataSourcesToKill,
-      int availableKillTaskSlots
+  /**
+   * Spawn kill tasks for each datasource to be killed upto {@code 
availableKillTaskSlots}.
+   * @param dataSourcesToKill finalized set of datasources to kill
+   * @param availableKillTaskSlots number of available kill task slots
+   */
+  private void killUnusedSegments(
+      @Nullable final Collection<String> dataSourcesToKill,
+      final int availableKillTaskSlots,
+      final CoordinatorRunStats stats
   )
   {
+    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill) || 
availableKillTaskSlots <= 0) {
+      stats.add(Stats.Kill.SUBMITTED_TASKS, 0);
+      return;
+    }

Review Comment:
   Much easier to read now, thanks!



##########
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java:
##########
@@ -96,362 +104,805 @@ public class KillUnusedSegmentsTest
   private DataSegment nextDaySegment;
   private DataSegment nextMonthSegment;
 
-  private KillUnusedSegments target;
 
   @Before
   public void setup()
   {
-    
Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig();
-    Mockito.doReturn(stats).when(params).getCoordinatorStats();
-    
Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod();
-    
Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain();
-    
Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod();
-    
Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments();
-    
Mockito.doReturn(Duration.parse("PT3154000000S")).when(config).getCoordinatorKillBufferPeriod();
+    segmentsMetadataManager = new TestSegmentsMetadataManager();
+    overlordClient = new TestOverlordClient();
 
-    Mockito.doReturn(Collections.singleton(DATASOURCE))
-           
.when(coordinatorDynamicConfig).getSpecificDataSourcesToKillUnusedSegmentsIn();
+    // These two can definitely be part of setup()
+    configBuilder = new TestDruidCoordinatorConfig.Builder()
+        .withCoordinatorIndexingPeriod(INDEXING_PERIOD)
+        .withCoordinatorKillPeriod(COORDINATOR_KILL_PERIOD)
+        .withCoordinatorKillDurationToRetain(DURATION_TO_RETAIN)
+        .withCoordinatorKillMaxSegments(MAX_SEGMENTS_TO_KILL)
+        .withCoordinatorKillBufferPeriod(BUFFER_PERIOD);
+    paramsBuilder = 
DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc());
+  }
 
-    final DateTime now = DateTimes.nowUtc();
+  /**
+   * The buffer periood and duration to retain influence kill behavior.
+   */
+  @Test
+  public void testDefaults()

Review Comment:
   Please update this test name to indicate whether it tests only the default 
config values or the default behaviour of the duty. 



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -146,130 +154,140 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
     return runInternal(params);
   }
 
-  @VisibleForTesting
-  DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams 
params)
+  private DruidCoordinatorRuntimeParams runInternal(final 
DruidCoordinatorRuntimeParams params)
   {
-    TaskStats taskStats = new TaskStats();
     Collection<String> dataSourcesToKill =
         
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
-    double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
-    int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
-    int killTaskCapacity = getKillTaskCapacity(
+    final double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
+    final int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
+    final int killTaskCapacity = getKillTaskCapacity(
         CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient),
         killTaskSlotRatio,
         maxKillTaskSlots
     );
-    int availableKillTaskSlots = getAvailableKillTaskSlots(
+    final int availableKillTaskSlots = getAvailableKillTaskSlots(
         killTaskCapacity,
         CoordinatorDutyUtils.getNumActiveTaskSlots(overlordClient, 
IS_AUTO_KILL_TASK).size()
     );
     final CoordinatorRunStats stats = params.getCoordinatorStats();
 
-    taskStats.availableTaskSlots = availableKillTaskSlots;
-    taskStats.maxSlots = killTaskCapacity;
-
     if (0 < availableKillTaskSlots) {
       // If no datasource has been specified, all are eligible for killing 
unused segments
       if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
         dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
       }
 
-      log.debug("Killing unused segments for datasources[%s]", 
dataSourcesToKill);
       lastKillTime = DateTimes.nowUtc();
-      taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, 
availableKillTaskSlots);
+      killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
     }
 
     // any datasources that are no longer being considered for kill should 
have their
     // last kill interval removed from map.
     datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill);
-    addStats(taskStats, stats);
+    
+    stats.add(Stats.Kill.AVAILABLE_SLOTS, availableKillTaskSlots);
+    stats.add(Stats.Kill.MAX_SLOTS, killTaskCapacity);
     return params;
   }
 
-  private void addStats(
-      TaskStats taskStats,
-      CoordinatorRunStats stats
-  )
-  {
-    stats.add(Stats.Kill.AVAILABLE_SLOTS, taskStats.availableTaskSlots);
-    stats.add(Stats.Kill.SUBMITTED_TASKS, taskStats.submittedTasks);
-    stats.add(Stats.Kill.MAX_SLOTS, taskStats.maxSlots);
-  }
-
-  private int killUnusedSegments(
-      Collection<String> dataSourcesToKill,
-      int availableKillTaskSlots
+  /**
+   * Spawn kill tasks for each datasource to be killed upto {@code 
availableKillTaskSlots}.
+   * @param dataSourcesToKill finalized set of datasources to kill
+   * @param availableKillTaskSlots number of available kill task slots
+   */
+  private void killUnusedSegments(
+      @Nullable final Collection<String> dataSourcesToKill,
+      final int availableKillTaskSlots,
+      final CoordinatorRunStats stats
   )
   {
+    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill) || 
availableKillTaskSlots <= 0) {
+      stats.add(Stats.Kill.SUBMITTED_TASKS, 0);
+      return;
+    }
     int submittedTasks = 0;
-    if (0 < availableKillTaskSlots && 
!CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-      for (String dataSource : dataSourcesToKill) {
-        if (submittedTasks >= availableKillTaskSlots) {
-          log.debug(StringUtils.format(
-              "Submitted [%d] kill tasks and reached kill task slot limit 
[%d]. Will resume "
-              + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
-          break;
-        }
-        final DateTime maxUsedStatusLastUpdatedTime = 
DateTimes.nowUtc().minus(bufferPeriod);
-        final Interval intervalToKill = findIntervalForKill(dataSource, 
maxUsedStatusLastUpdatedTime);
-        if (intervalToKill == null) {
-          datasourceToLastKillIntervalEnd.remove(dataSource);
-          continue;
-        }
+    for (String dataSource : dataSourcesToKill) {
+      if (submittedTasks >= availableKillTaskSlots) {
+        log.info(StringUtils.format(
+            "Submitted [%d] kill tasks and reached kill task slot limit [%d]. 
Will resume "
+            + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
+        break;
+      }
+      final DateTime maxUsedStatusLastUpdatedTime = 
DateTimes.nowUtc().minus(bufferPeriod);
+      final Interval intervalToKill = findIntervalForKill(dataSource, 
maxUsedStatusLastUpdatedTime, stats);
+      if (intervalToKill == null) {
+        datasourceToLastKillIntervalEnd.remove(dataSource);
+        continue;
+      }
 
-        try {
-          FutureUtils.getUnchecked(
-              overlordClient.runKillTask(
-                  TASK_ID_PREFIX,
-                  dataSource,
-                  intervalToKill,
-                  maxSegmentsToKill,
-                  maxUsedStatusLastUpdatedTime
-              ),
-              true
-          );
-          ++submittedTasks;
-          datasourceToLastKillIntervalEnd.put(dataSource, 
intervalToKill.getEnd());
-        }
-        catch (Exception ex) {
-          log.error(ex, "Failed to submit kill task for dataSource[%s] in 
interval[%s]", dataSource, intervalToKill);
-          if (Thread.currentThread().isInterrupted()) {
-            log.warn("Skipping kill task scheduling because thread is 
interrupted.");
-            break;
-          }
+      try {
+        FutureUtils.getUnchecked(
+            overlordClient.runKillTask(
+                TASK_ID_PREFIX,
+                dataSource,
+                intervalToKill,
+                maxSegmentsToKill,
+                maxUsedStatusLastUpdatedTime
+            ),
+            true
+        );
+        ++submittedTasks;
+        datasourceToLastKillIntervalEnd.put(dataSource, 
intervalToKill.getEnd());
+      }
+      catch (Exception ex) {
+        log.error(ex, "Failed to submit kill task for dataSource[%s] in 
interval[%s]", dataSource, intervalToKill);
+        if (Thread.currentThread().isInterrupted()) {
+          log.warn("Skipping kill task scheduling because thread is 
interrupted.");
+          break;
         }
       }
     }
 
-    if (log.isDebugEnabled()) {
-      log.debug(
-          "Submitted [%d] kill tasks for [%d] datasources.%s",
+    log.info("Submitted [%d] kill tasks for [%d] datasources.%s",
           submittedTasks,
           dataSourcesToKill.size(),
           availableKillTaskSlots < dataSourcesToKill.size()
               ? StringUtils.format(
               " Datasources skipped: %s",
-              ImmutableList.copyOf(dataSourcesToKill).subList(submittedTasks, 
dataSourcesToKill.size())
-          )
+              ImmutableList.copyOf(dataSourcesToKill).subList(submittedTasks, 
dataSourcesToKill.size()))
               : ""
-      );

Review Comment:
   This style of determining skipped datasources is extremely convoluted and 
that too just for logging.
   It would be better to just keep a set of `remainingDatasourcesToKill` and 
keep removing items from it as soon as tasks are submitted for that datasource.



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -146,130 +154,140 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
     return runInternal(params);
   }
 
-  @VisibleForTesting
-  DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams 
params)
+  private DruidCoordinatorRuntimeParams runInternal(final 
DruidCoordinatorRuntimeParams params)
   {
-    TaskStats taskStats = new TaskStats();
     Collection<String> dataSourcesToKill =
         
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
-    double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
-    int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
-    int killTaskCapacity = getKillTaskCapacity(
+    final double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
+    final int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
+    final int killTaskCapacity = getKillTaskCapacity(
         CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient),
         killTaskSlotRatio,
         maxKillTaskSlots
     );
-    int availableKillTaskSlots = getAvailableKillTaskSlots(
+    final int availableKillTaskSlots = getAvailableKillTaskSlots(
         killTaskCapacity,
         CoordinatorDutyUtils.getNumActiveTaskSlots(overlordClient, 
IS_AUTO_KILL_TASK).size()
     );
     final CoordinatorRunStats stats = params.getCoordinatorStats();
 
-    taskStats.availableTaskSlots = availableKillTaskSlots;
-    taskStats.maxSlots = killTaskCapacity;
-
     if (0 < availableKillTaskSlots) {
       // If no datasource has been specified, all are eligible for killing 
unused segments
       if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
         dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
       }
 
-      log.debug("Killing unused segments for datasources[%s]", 
dataSourcesToKill);
       lastKillTime = DateTimes.nowUtc();
-      taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, 
availableKillTaskSlots);
+      killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
     }
 
     // any datasources that are no longer being considered for kill should 
have their
     // last kill interval removed from map.
     datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill);
-    addStats(taskStats, stats);
+    
+    stats.add(Stats.Kill.AVAILABLE_SLOTS, availableKillTaskSlots);
+    stats.add(Stats.Kill.MAX_SLOTS, killTaskCapacity);
     return params;
   }
 
-  private void addStats(
-      TaskStats taskStats,
-      CoordinatorRunStats stats
-  )
-  {
-    stats.add(Stats.Kill.AVAILABLE_SLOTS, taskStats.availableTaskSlots);
-    stats.add(Stats.Kill.SUBMITTED_TASKS, taskStats.submittedTasks);
-    stats.add(Stats.Kill.MAX_SLOTS, taskStats.maxSlots);
-  }
-
-  private int killUnusedSegments(
-      Collection<String> dataSourcesToKill,
-      int availableKillTaskSlots
+  /**
+   * Spawn kill tasks for each datasource to be killed upto {@code 
availableKillTaskSlots}.
+   * @param dataSourcesToKill finalized set of datasources to kill
+   * @param availableKillTaskSlots number of available kill task slots
+   */
+  private void killUnusedSegments(
+      @Nullable final Collection<String> dataSourcesToKill,
+      final int availableKillTaskSlots,
+      final CoordinatorRunStats stats
   )
   {
+    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill) || 
availableKillTaskSlots <= 0) {
+      stats.add(Stats.Kill.SUBMITTED_TASKS, 0);
+      return;
+    }
     int submittedTasks = 0;
-    if (0 < availableKillTaskSlots && 
!CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-      for (String dataSource : dataSourcesToKill) {
-        if (submittedTasks >= availableKillTaskSlots) {
-          log.debug(StringUtils.format(
-              "Submitted [%d] kill tasks and reached kill task slot limit 
[%d]. Will resume "
-              + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
-          break;
-        }
-        final DateTime maxUsedStatusLastUpdatedTime = 
DateTimes.nowUtc().minus(bufferPeriod);
-        final Interval intervalToKill = findIntervalForKill(dataSource, 
maxUsedStatusLastUpdatedTime);
-        if (intervalToKill == null) {
-          datasourceToLastKillIntervalEnd.remove(dataSource);
-          continue;
-        }
+    for (String dataSource : dataSourcesToKill) {
+      if (submittedTasks >= availableKillTaskSlots) {
+        log.info(StringUtils.format(
+            "Submitted [%d] kill tasks and reached kill task slot limit [%d]. 
Will resume "
+            + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
+        break;
+      }
+      final DateTime maxUsedStatusLastUpdatedTime = 
DateTimes.nowUtc().minus(bufferPeriod);
+      final Interval intervalToKill = findIntervalForKill(dataSource, 
maxUsedStatusLastUpdatedTime, stats);
+      if (intervalToKill == null) {
+        datasourceToLastKillIntervalEnd.remove(dataSource);
+        continue;
+      }
 
-        try {
-          FutureUtils.getUnchecked(
-              overlordClient.runKillTask(
-                  TASK_ID_PREFIX,
-                  dataSource,
-                  intervalToKill,
-                  maxSegmentsToKill,
-                  maxUsedStatusLastUpdatedTime
-              ),
-              true
-          );
-          ++submittedTasks;
-          datasourceToLastKillIntervalEnd.put(dataSource, 
intervalToKill.getEnd());
-        }
-        catch (Exception ex) {
-          log.error(ex, "Failed to submit kill task for dataSource[%s] in 
interval[%s]", dataSource, intervalToKill);
-          if (Thread.currentThread().isInterrupted()) {
-            log.warn("Skipping kill task scheduling because thread is 
interrupted.");
-            break;
-          }
+      try {
+        FutureUtils.getUnchecked(
+            overlordClient.runKillTask(
+                TASK_ID_PREFIX,
+                dataSource,
+                intervalToKill,
+                maxSegmentsToKill,
+                maxUsedStatusLastUpdatedTime
+            ),
+            true
+        );
+        ++submittedTasks;
+        datasourceToLastKillIntervalEnd.put(dataSource, 
intervalToKill.getEnd());
+      }
+      catch (Exception ex) {
+        log.error(ex, "Failed to submit kill task for dataSource[%s] in 
interval[%s]", dataSource, intervalToKill);
+        if (Thread.currentThread().isInterrupted()) {
+          log.warn("Skipping kill task scheduling because thread is 
interrupted.");
+          break;
         }
       }
     }
 
-    if (log.isDebugEnabled()) {
-      log.debug(
-          "Submitted [%d] kill tasks for [%d] datasources.%s",
+    log.info("Submitted [%d] kill tasks for [%d] datasources.%s",

Review Comment:
   The original style was more consistent:
   ```suggestion
       log.info(
          "Submitted [%d] kill tasks for [%d] datasources.%s",
   ```



##########
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java:
##########
@@ -96,362 +104,805 @@ public class KillUnusedSegmentsTest
   private DataSegment nextDaySegment;
   private DataSegment nextMonthSegment;
 
-  private KillUnusedSegments target;
 
   @Before
   public void setup()
   {
-    
Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig();
-    Mockito.doReturn(stats).when(params).getCoordinatorStats();
-    
Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod();
-    
Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain();
-    
Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod();
-    
Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments();
-    
Mockito.doReturn(Duration.parse("PT3154000000S")).when(config).getCoordinatorKillBufferPeriod();
+    segmentsMetadataManager = new TestSegmentsMetadataManager();
+    overlordClient = new TestOverlordClient();
 
-    Mockito.doReturn(Collections.singleton(DATASOURCE))
-           
.when(coordinatorDynamicConfig).getSpecificDataSourcesToKillUnusedSegmentsIn();
+    // These two can definitely be part of setup()
+    configBuilder = new TestDruidCoordinatorConfig.Builder()
+        .withCoordinatorIndexingPeriod(INDEXING_PERIOD)
+        .withCoordinatorKillPeriod(COORDINATOR_KILL_PERIOD)
+        .withCoordinatorKillDurationToRetain(DURATION_TO_RETAIN)
+        .withCoordinatorKillMaxSegments(MAX_SEGMENTS_TO_KILL)
+        .withCoordinatorKillBufferPeriod(BUFFER_PERIOD);
+    paramsBuilder = 
DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc());
+  }
 
-    final DateTime now = DateTimes.nowUtc();
+  /**
+   * The buffer periood and duration to retain influence kill behavior.
+   */
+  @Test
+  public void testDefaults()
+  {
+    final DateTime sixtyDaysAgo = NOW.minusDays(60);
 
-    yearOldSegment = createSegmentWithEnd(now.minusDays(365));
-    monthOldSegment = createSegmentWithEnd(now.minusDays(30));
-    dayOldSegment = createSegmentWithEnd(now.minusDays(1));
-    hourOldSegment = createSegmentWithEnd(now.minusHours(1));
-    nextDaySegment = createSegmentWithEnd(now.plusDays(1));
-    nextMonthSegment = createSegmentWithEnd(now.plusDays(30));
-
-    final List<DataSegment> unusedSegments = ImmutableList.of(
-        yearOldSegment,
-        monthOldSegment,
-        dayOldSegment,
-        hourOldSegment,
-        nextDaySegment,
-        nextMonthSegment
-    );
-
-    Mockito.when(
-        segmentsMetadataManager.getUnusedSegmentIntervals(
-            ArgumentMatchers.anyString(),
-            ArgumentMatchers.any(),
-            ArgumentMatchers.any(),
-            ArgumentMatchers.anyInt(),
-            ArgumentMatchers.any()
-        )
-    ).thenAnswer(invocation -> {
-      DateTime minStartTime = invocation.getArgument(1);
-      DateTime maxEndTime = invocation.getArgument(2);
-      long maxEndMillis = maxEndTime.getMillis();
-      Long minStartMillis = minStartTime != null ? minStartTime.getMillis() : 
null;
-      List<Interval> unusedIntervals =
-          unusedSegments.stream()
-                        .map(DataSegment::getInterval)
-                        .filter(i -> i.getEnd().getMillis() <= maxEndMillis
-                                     && (null == minStartMillis || 
i.getStart().getMillis() >= minStartMillis))
-                        .collect(Collectors.toList());
-
-      int limit = invocation.getArgument(3);
-      return unusedIntervals.size() <= limit ? unusedIntervals : 
unusedIntervals.subList(0, limit);
-    });
-
-    target = new KillUnusedSegments(
+    createAndAddUnusedSegment(DS1, YEAR_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, MONTH_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, DAY_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, HOUR_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, NEXT_DAY, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, Intervals.ETERNITY, sixtyDaysAgo);
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        new TestDruidCoordinatorConfig.Builder().build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstRun = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 1L)),
+        firstRun.getCoordinatorStats()
     );
+
+    validateAndResetState(DS1, YEAR_OLD);
   }
 
   @Test
   public void testRunWithNoIntervalShouldNotKillAnySegments()
   {
-    
Mockito.doReturn(null).when(segmentsMetadataManager).getUnusedSegmentIntervals(
-        ArgumentMatchers.anyString(),
-        ArgumentMatchers.any(),
-        ArgumentMatchers.any(),
-        ArgumentMatchers.anyInt(),
-        ArgumentMatchers.any()
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 0, 10, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
     );
 
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    target.run(params);
-    Mockito.verify(overlordClient, Mockito.never())
-           .runKillTask(anyString(), anyString(), any(Interval.class), 
anyInt(), any(DateTime.class));
+    validateAndResetState(DS1, null);
+    validateAndResetState(DS2, null);
   }
 
   @Test
   public void 
testRunWithSpecificDatasourceAndNoIntervalShouldNotKillAnySegments()
   {
-    Mockito.doReturn(Duration.standardDays(400))
-           .when(config).getCoordinatorKillDurationToRetain();
-    target = new KillUnusedSegments(
+    
configBuilder.withCoordinatorKillDurationToRetain(Duration.standardDays(400));
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        configBuilder.build()
     );
 
-    // No unused segment is older than the retention period
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    target.run(params);
-    Mockito.verify(overlordClient, Mockito.never())
-           .runKillTask(anyString(), anyString(), any(Interval.class), 
anyInt(), any(DateTime.class));
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 0, 10, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, null);
+    validateAndResetState(DS2, null);
+  }
+
+  /**
+   * The kill period is honored after the first indexing run.
+   */
+  @Test
+  public void testRunsWithKillPeriod()
+  {
+    configBuilder.withCoordinatorKillPeriod(Duration.standardHours(1));
+
+    setupUnusedSegments();
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstRun = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 2L)),
+        firstRun.getCoordinatorStats()
+    );
+
+    validateAndResetState(
+        DS1,
+        new Interval(
+            yearOldSegment.getInterval().getStart(),
+            monthOldSegment.getInterval().getEnd()
+        )
+    );
+
+    final DruidCoordinatorRuntimeParams secondRun = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 1, 10,  ImmutableMap.of(DS1, 2L)),
+        secondRun.getCoordinatorStats()
+    );
+    validateAndResetState(DS1, null);
+  }
+
+  /**
+   * Similar to {@link #testMultipleRuns()}
+   */
+  @Test
+  public void testMultipleDatasources()
+  {
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillMaxSegments(2);
+
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, MONTH_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, DAY_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_DAY, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, NOW.minusDays(1));
+
+    createAndAddUnusedSegment(DS2, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS2, DAY_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS2, NEXT_DAY, NOW.minusDays(1));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 2, 10, ImmutableMap.of(DS1, 2L, DS2, 2L)),
+        firstKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(YEAR_OLD.getStart(), 
MONTH_OLD.getEnd()));
+    validateAndResetState(DS2, new Interval(YEAR_OLD.getStart(), 
DAY_OLD.getEnd()));
+
+    final DruidCoordinatorRuntimeParams secondKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(20, 4, 20,  ImmutableMap.of(DS1, 4L, DS2, 3L)),
+        secondKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(DAY_OLD.getStart(), 
NEXT_DAY.getEnd()));
+    validateAndResetState(DS2, NEXT_DAY);
+
+    final DruidCoordinatorRuntimeParams thirdKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(30, 5, 30, ImmutableMap.of(DS1, 5L, DS2, 3L)),
+        thirdKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, NEXT_MONTH);
+    validateAndResetState(DS2, null);
+  }
+
+  /**
+   * Even though "more recent" segments are also considered as candidates in 
the wide kill interval here,
+   * the kill task will narrow down to clean up only the segments that 
strictly honor the max kill time.
+   * See {@code 
KillUnusedSegmentsTaskTest#testKillMultipleUnusedSegmentsWithNullMaxUsedStatusLastUpdatedTime}
+   * for example.
+   */
+  @Test
+  public void testSpreadOutLastMaxSegments()
+  {
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillBufferPeriod(Duration.standardDays(3));
+
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(10));
+    createAndAddUnusedSegment(DS1, MONTH_OLD, NOW.minusDays(10));
+    createAndAddUnusedSegment(DS1, DAY_OLD, NOW.minusDays(2));
+    createAndAddUnusedSegment(DS1, HOUR_OLD, NOW.minusDays(2));
+    createAndAddUnusedSegment(DS1, NEXT_DAY, NOW.minusDays(10));
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, NOW.minusDays(10));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 4L)),
+        firstKill.getCoordinatorStats()
+    );
+    validateAndResetState(DS1, new Interval(YEAR_OLD.getStart(), 
NEXT_MONTH.getEnd()));
+  }
+
+  @Test
+  public void testAddOldVersionsLater()
+  {
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillMaxSegments(2);
+
+    createAndAddUnusedSegment(DS1, DAY_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_DAY, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, NOW.minusDays(1));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 2L)),
+        firstKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(DAY_OLD.getStart(), 
NEXT_DAY.getEnd()));
+
+    log.info("Second run...");
+    // Add two old unused segments now. These only get killed much later on 
when the kill
+    // duty eventually round robins its way through until the latest time.
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, MONTH_OLD, NOW.minusDays(1));
+
+    final DruidCoordinatorRuntimeParams secondKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(20, 2, 20, ImmutableMap.of(DS1, 3L)),
+        secondKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, NEXT_MONTH);
+
+    log.info("Third run...");
+    final DruidCoordinatorRuntimeParams thirdKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(30, 2, 30, ImmutableMap.of(DS1, 3L)),
+        thirdKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, null);
+
+    log.info("Fourth run...");

Review Comment:
   Please remove these log lines once the tests are settled. The duty itself 
would be emitting logs.



##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java:
##########
@@ -221,7 +265,28 @@ public List<Interval> getUnusedSegmentIntervals(
       final DateTime maxUsedStatusLastUpdatedTime
   )
   {
-    return null;
+    final List<DataSegmentPlus> sortedDataSegments = new 
ArrayList<>(unusedSegments.values());

Review Comment:
   You could also just use a `TreeSet`.



##########
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java:
##########
@@ -96,362 +104,805 @@ public class KillUnusedSegmentsTest
   private DataSegment nextDaySegment;
   private DataSegment nextMonthSegment;
 
-  private KillUnusedSegments target;
 
   @Before
   public void setup()
   {
-    
Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig();
-    Mockito.doReturn(stats).when(params).getCoordinatorStats();
-    
Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod();
-    
Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain();
-    
Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod();
-    
Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments();
-    
Mockito.doReturn(Duration.parse("PT3154000000S")).when(config).getCoordinatorKillBufferPeriod();
+    segmentsMetadataManager = new TestSegmentsMetadataManager();
+    overlordClient = new TestOverlordClient();
 
-    Mockito.doReturn(Collections.singleton(DATASOURCE))
-           
.when(coordinatorDynamicConfig).getSpecificDataSourcesToKillUnusedSegmentsIn();
+    // These two can definitely be part of setup()
+    configBuilder = new TestDruidCoordinatorConfig.Builder()
+        .withCoordinatorIndexingPeriod(INDEXING_PERIOD)
+        .withCoordinatorKillPeriod(COORDINATOR_KILL_PERIOD)
+        .withCoordinatorKillDurationToRetain(DURATION_TO_RETAIN)
+        .withCoordinatorKillMaxSegments(MAX_SEGMENTS_TO_KILL)
+        .withCoordinatorKillBufferPeriod(BUFFER_PERIOD);
+    paramsBuilder = 
DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc());
+  }
 
-    final DateTime now = DateTimes.nowUtc();
+  /**
+   * The buffer periood and duration to retain influence kill behavior.
+   */
+  @Test
+  public void testDefaults()
+  {
+    final DateTime sixtyDaysAgo = NOW.minusDays(60);
 
-    yearOldSegment = createSegmentWithEnd(now.minusDays(365));
-    monthOldSegment = createSegmentWithEnd(now.minusDays(30));
-    dayOldSegment = createSegmentWithEnd(now.minusDays(1));
-    hourOldSegment = createSegmentWithEnd(now.minusHours(1));
-    nextDaySegment = createSegmentWithEnd(now.plusDays(1));
-    nextMonthSegment = createSegmentWithEnd(now.plusDays(30));
-
-    final List<DataSegment> unusedSegments = ImmutableList.of(
-        yearOldSegment,
-        monthOldSegment,
-        dayOldSegment,
-        hourOldSegment,
-        nextDaySegment,
-        nextMonthSegment
-    );
-
-    Mockito.when(
-        segmentsMetadataManager.getUnusedSegmentIntervals(
-            ArgumentMatchers.anyString(),
-            ArgumentMatchers.any(),
-            ArgumentMatchers.any(),
-            ArgumentMatchers.anyInt(),
-            ArgumentMatchers.any()
-        )
-    ).thenAnswer(invocation -> {
-      DateTime minStartTime = invocation.getArgument(1);
-      DateTime maxEndTime = invocation.getArgument(2);
-      long maxEndMillis = maxEndTime.getMillis();
-      Long minStartMillis = minStartTime != null ? minStartTime.getMillis() : 
null;
-      List<Interval> unusedIntervals =
-          unusedSegments.stream()
-                        .map(DataSegment::getInterval)
-                        .filter(i -> i.getEnd().getMillis() <= maxEndMillis
-                                     && (null == minStartMillis || 
i.getStart().getMillis() >= minStartMillis))
-                        .collect(Collectors.toList());
-
-      int limit = invocation.getArgument(3);
-      return unusedIntervals.size() <= limit ? unusedIntervals : 
unusedIntervals.subList(0, limit);
-    });
-
-    target = new KillUnusedSegments(
+    createAndAddUnusedSegment(DS1, YEAR_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, MONTH_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, DAY_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, HOUR_OLD, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, NEXT_DAY, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, sixtyDaysAgo);
+    createAndAddUnusedSegment(DS1, Intervals.ETERNITY, sixtyDaysAgo);
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        new TestDruidCoordinatorConfig.Builder().build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstRun = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 1L)),
+        firstRun.getCoordinatorStats()
     );
+
+    validateAndResetState(DS1, YEAR_OLD);
   }
 
   @Test
   public void testRunWithNoIntervalShouldNotKillAnySegments()
   {
-    
Mockito.doReturn(null).when(segmentsMetadataManager).getUnusedSegmentIntervals(
-        ArgumentMatchers.anyString(),
-        ArgumentMatchers.any(),
-        ArgumentMatchers.any(),
-        ArgumentMatchers.anyInt(),
-        ArgumentMatchers.any()
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 0, 10, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
     );
 
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    target.run(params);
-    Mockito.verify(overlordClient, Mockito.never())
-           .runKillTask(anyString(), anyString(), any(Interval.class), 
anyInt(), any(DateTime.class));
+    validateAndResetState(DS1, null);
+    validateAndResetState(DS2, null);
   }
 
   @Test
   public void 
testRunWithSpecificDatasourceAndNoIntervalShouldNotKillAnySegments()
   {
-    Mockito.doReturn(Duration.standardDays(400))
-           .when(config).getCoordinatorKillDurationToRetain();
-    target = new KillUnusedSegments(
+    
configBuilder.withCoordinatorKillDurationToRetain(Duration.standardDays(400));
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        configBuilder.build()
     );
 
-    // No unused segment is older than the retention period
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    target.run(params);
-    Mockito.verify(overlordClient, Mockito.never())
-           .runKillTask(anyString(), anyString(), any(Interval.class), 
anyInt(), any(DateTime.class));
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 0, 10, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, null);
+    validateAndResetState(DS2, null);
+  }
+
+  /**
+   * The kill period is honored after the first indexing run.
+   */
+  @Test
+  public void testRunsWithKillPeriod()
+  {
+    configBuilder.withCoordinatorKillPeriod(Duration.standardHours(1));
+
+    setupUnusedSegments();
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstRun = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 2L)),
+        firstRun.getCoordinatorStats()
+    );
+
+    validateAndResetState(
+        DS1,
+        new Interval(
+            yearOldSegment.getInterval().getStart(),
+            monthOldSegment.getInterval().getEnd()
+        )
+    );
+
+    final DruidCoordinatorRuntimeParams secondRun = 
killDuty.run(paramsBuilder.build());
+    validateStats(
+        new ExpectedStats(10, 1, 10,  ImmutableMap.of(DS1, 2L)),
+        secondRun.getCoordinatorStats()
+    );
+    validateAndResetState(DS1, null);
+  }
+
+  /**
+   * Similar to {@link #testMultipleRuns()}
+   */
+  @Test
+  public void testMultipleDatasources()
+  {
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillMaxSegments(2);
+
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, MONTH_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, DAY_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_DAY, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, NOW.minusDays(1));
+
+    createAndAddUnusedSegment(DS2, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS2, DAY_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS2, NEXT_DAY, NOW.minusDays(1));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 2, 10, ImmutableMap.of(DS1, 2L, DS2, 2L)),
+        firstKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(YEAR_OLD.getStart(), 
MONTH_OLD.getEnd()));
+    validateAndResetState(DS2, new Interval(YEAR_OLD.getStart(), 
DAY_OLD.getEnd()));
+
+    final DruidCoordinatorRuntimeParams secondKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(20, 4, 20,  ImmutableMap.of(DS1, 4L, DS2, 3L)),
+        secondKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(DAY_OLD.getStart(), 
NEXT_DAY.getEnd()));
+    validateAndResetState(DS2, NEXT_DAY);
+
+    final DruidCoordinatorRuntimeParams thirdKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(30, 5, 30, ImmutableMap.of(DS1, 5L, DS2, 3L)),
+        thirdKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, NEXT_MONTH);
+    validateAndResetState(DS2, null);
+  }
+
+  /**
+   * Even though "more recent" segments are also considered as candidates in 
the wide kill interval here,
+   * the kill task will narrow down to clean up only the segments that 
strictly honor the max kill time.
+   * See {@code 
KillUnusedSegmentsTaskTest#testKillMultipleUnusedSegmentsWithNullMaxUsedStatusLastUpdatedTime}
+   * for example.
+   */
+  @Test
+  public void testSpreadOutLastMaxSegments()
+  {
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillBufferPeriod(Duration.standardDays(3));
+
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(10));
+    createAndAddUnusedSegment(DS1, MONTH_OLD, NOW.minusDays(10));
+    createAndAddUnusedSegment(DS1, DAY_OLD, NOW.minusDays(2));
+    createAndAddUnusedSegment(DS1, HOUR_OLD, NOW.minusDays(2));
+    createAndAddUnusedSegment(DS1, NEXT_DAY, NOW.minusDays(10));
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, NOW.minusDays(10));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 4L)),
+        firstKill.getCoordinatorStats()
+    );
+    validateAndResetState(DS1, new Interval(YEAR_OLD.getStart(), 
NEXT_MONTH.getEnd()));
+  }
+
+  @Test
+  public void testAddOldVersionsLater()
+  {
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillMaxSegments(2);
+
+    createAndAddUnusedSegment(DS1, DAY_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_DAY, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, NEXT_MONTH, NOW.minusDays(1));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams firstKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 2L)),
+        firstKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(DAY_OLD.getStart(), 
NEXT_DAY.getEnd()));
+
+    log.info("Second run...");
+    // Add two old unused segments now. These only get killed much later on 
when the kill
+    // duty eventually round robins its way through until the latest time.
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS1, MONTH_OLD, NOW.minusDays(1));
+
+    final DruidCoordinatorRuntimeParams secondKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(20, 2, 20, ImmutableMap.of(DS1, 3L)),
+        secondKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, NEXT_MONTH);
+
+    log.info("Third run...");
+    final DruidCoordinatorRuntimeParams thirdKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(30, 2, 30, ImmutableMap.of(DS1, 3L)),
+        thirdKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, null);
+
+    log.info("Fourth run...");
+
+    final DruidCoordinatorRuntimeParams fourthKill = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(40, 3, 40, ImmutableMap.of(DS1, 5L)),
+        fourthKill.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, new Interval(YEAR_OLD.getStart(), 
MONTH_OLD.getEnd()));
+  }
+
+  @Test
+  public void testNoDatasources()
+  {
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 0, 10, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS1, null);
+    validateAndResetState(DS2, null);
+  }
+
+  @Test
+  public void testWhiteList()
+  {
+    // Only segments more than a day old are killed
+    
dynamicConfigBuilder.withSpecificDataSourcesToKillUnusedSegmentsIn(Collections.singleton(DS2));
+    paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
+
+    createAndAddUnusedSegment(DS1, YEAR_OLD, NOW.minusDays(1));
+    createAndAddUnusedSegment(DS2, YEAR_OLD, NOW.minusDays(1));
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 0L, DS2, 1L)),
+        runParams.getCoordinatorStats()
+    );
+
+    validateAndResetState(DS2, YEAR_OLD);
+    validateAndResetState(DS1, null);
   }
 
   @Test
   public void testDurationToRetain()
   {
     // Only segments more than a day old are killed
-    Interval expectedKillInterval = new Interval(
-        yearOldSegment.getInterval().getStart(),
-        dayOldSegment.getInterval().getEnd()
+    setupUnusedSegments();
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 2L)),
+        runParams.getCoordinatorStats()
+    );
+
+    validateAndResetState(
+        DS1,
+        new Interval(
+            yearOldSegment.getInterval().getStart(), 
monthOldSegment.getInterval().getEnd())
     );
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(expectedKillInterval);
-    verifyState(ImmutableMap.of(DATASOURCE, 
dayOldSegment.getInterval().getEnd()));
-    verifyStats(9, 1, 10);
   }
 
   @Test
   public void testNegativeDurationToRetain()
   {
-    // Duration to retain = -1 day, reinit target for config to take effect
-    Mockito.doReturn(DURATION_TO_RETAIN.negated())
-           .when(config).getCoordinatorKillDurationToRetain();
-    target = new KillUnusedSegments(
+    
configBuilder.withCoordinatorKillDurationToRetain(DURATION_TO_RETAIN.negated());
+
+    setupUnusedSegments();
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 5L)),
+        runParams.getCoordinatorStats()
     );
 
-    // Segments upto 1 day in the future are killed
-    Interval expectedKillInterval = new Interval(
-        yearOldSegment.getInterval().getStart(),
-        nextDaySegment.getInterval().getEnd()
+    validateAndResetState(
+        DS1,
+        new Interval(yearOldSegment.getInterval().getStart(), 
nextDaySegment.getInterval().getEnd())
     );
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(expectedKillInterval);
-    verifyState(ImmutableMap.of(DATASOURCE, 
nextDaySegment.getInterval().getEnd()));
-    verifyStats(9, 1, 10);
   }
 
   @Test
   public void testIgnoreDurationToRetain()
   {
-    Mockito.doReturn(true)
-           .when(config).getCoordinatorKillIgnoreDurationToRetain();
-    target = new KillUnusedSegments(
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+
+    setupUnusedSegments();
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 6L)),
+        runParams.getCoordinatorStats()
     );
 
-    // All future and past unused segments are killed
-    Interval expectedKillInterval = new Interval(
-        yearOldSegment.getInterval().getStart(),
-        nextMonthSegment.getInterval().getEnd()
+    // All past and future unused segments should be killed
+    validateAndResetState(
+        DS1,
+        new Interval(yearOldSegment.getInterval().getStart(), 
nextMonthSegment.getInterval().getEnd())
     );
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(expectedKillInterval);
-    verifyState(ImmutableMap.of(DATASOURCE, 
nextMonthSegment.getInterval().getEnd()));
-    verifyStats(9, 1, 10);
   }
 
   @Test
   public void testMaxSegmentsToKill()
   {
-    Mockito.doReturn(1)
-           .when(config).getCoordinatorKillMaxSegments();
-    target = new KillUnusedSegments(
+    configBuilder.withCoordinatorKillMaxSegments(1);
+
+    setupUnusedSegments();
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 1L)),
+        runParams.getCoordinatorStats()
     );
 
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    // Only 1 unused segment is killed
-    runAndVerifyKillInterval(yearOldSegment.getInterval());
-    verifyState(ImmutableMap.of(DATASOURCE, 
yearOldSegment.getInterval().getEnd()));
-    verifyStats(9, 1, 10);
+    validateAndResetState(
+        DS1,
+        yearOldSegment.getInterval()
+    );
   }
 
   @Test
   public void testMultipleRuns()
   {
-    Mockito.doReturn(true)
-        .when(config).getCoordinatorKillIgnoreDurationToRetain();
-    Mockito.doReturn(2)
-        .when(config).getCoordinatorKillMaxSegments();
-    target = new KillUnusedSegments(
+    configBuilder.withCoordinatorKillIgnoreDurationToRetain(true);
+    configBuilder.withCoordinatorKillMaxSegments(2);
+
+    setupUnusedSegments();
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
         segmentsMetadataManager,
         overlordClient,
-        config
+        configBuilder.build()
+    );
+
+    log.info("First run...");
+    final DruidCoordinatorRuntimeParams firstRun = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(10, 1, 10, ImmutableMap.of(DS1, 2L)),
+        firstRun.getCoordinatorStats()
+    );
+
+    validateAndResetState(
+        DS1,
+        new Interval(
+            yearOldSegment.getInterval().getStart(),
+            monthOldSegment.getInterval().getEnd()
+        )
     );
 
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(new Interval(
-        yearOldSegment.getInterval().getStart(),
-        monthOldSegment.getInterval().getEnd()
-    ));
-    verifyState(ImmutableMap.of(DATASOURCE, 
monthOldSegment.getInterval().getEnd()));
+    log.info("Second run...");
+    final DruidCoordinatorRuntimeParams secondRun = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(20, 2, 20, ImmutableMap.of(DS1, 4L)),
+        secondRun.getCoordinatorStats()
+    );
+
+    validateAndResetState(
+        DS1,
+        new Interval(
+            dayOldSegment.getInterval().getStart(),
+            hourOldSegment.getInterval().getEnd()
+        )
+    );
 
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(new Interval(
-        dayOldSegment.getInterval().getStart(),
-        hourOldSegment.getInterval().getEnd()
-    ));
-    verifyState(ImmutableMap.of(DATASOURCE, 
hourOldSegment.getInterval().getEnd()));
+    log.info("Third run...");
+    final DruidCoordinatorRuntimeParams thirdRun = 
killDuty.run(paramsBuilder.build());
 
-    mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
-    runAndVerifyKillInterval(new Interval(
-        nextDaySegment.getInterval().getStart(),
-        nextMonthSegment.getInterval().getEnd()
-    ));
-    verifyState(ImmutableMap.of(DATASOURCE, 
nextMonthSegment.getInterval().getEnd()));
-    verifyStats(9, 1, 10, 3);
+    validateStats(
+        new ExpectedStats(30, 3, 30, ImmutableMap.of(DS1, 6L)),
+        thirdRun.getCoordinatorStats()
+    );
+
+    validateAndResetState(
+        DS1,
+        new Interval(
+            nextDaySegment.getInterval().getStart(),
+            nextMonthSegment.getInterval().getEnd()
+        )
+    );
   }
 
   @Test
   public void testKillTaskSlotRatioNoAvailableTaskCapacityForKill()
   {
-    mockTaskSlotUsage(0.10, 10, 1, 5);
-    runAndVerifyNoKill();
-    verifyState(ImmutableMap.of());
-    verifyStats(0, 0, 0);
+    dynamicConfigBuilder.withKillTaskSlotRatio(0.10);
+    dynamicConfigBuilder.withMaxKillTaskSlots(10);
+    paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
+
+    overlordClient = new TestOverlordClient(1, 5);
+
+    overlordClient.addTask(DS1);
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(0, 0, 0, ImmutableMap.of()),
+        runParams.getCoordinatorStats()
+    );
+    validateAndResetState(DS1, null);
   }
 
   @Test
   public void testMaxKillTaskSlotsNoAvailableTaskCapacityForKill()
   {
-    mockTaskSlotUsage(1.0, 3, 3, 10);
-    runAndVerifyNoKill();
-    verifyState(ImmutableMap.of());
-    verifyStats(0, 0, 3);
+    dynamicConfigBuilder.withKillTaskSlotRatio(1.0);
+    dynamicConfigBuilder.withMaxKillTaskSlots(3);
+    paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
+
+    overlordClient = new TestOverlordClient(3, 10);
+    overlordClient.addTask(DS1);
+    overlordClient.addTask(DS1);
+    overlordClient.addTask(DS1);
+
+    final KillUnusedSegments killDuty = new KillUnusedSegments(
+        segmentsMetadataManager,
+        overlordClient,
+        configBuilder.build()
+    );
+
+    final DruidCoordinatorRuntimeParams runParams = 
killDuty.run(paramsBuilder.build());
+
+    validateStats(
+        new ExpectedStats(0, 0, 3, ImmutableMap.of()),

Review Comment:
   I feel it would be much more readable to just validate the relevant stats 
individually, rather than having to pass zeroes and empty maps repeatedly. This 
would allow someone reading the test to instantly recognize exactly what is 
being verified.
   
   You can see some examples in `RunRulesTest`.
   Something like:
   ```
   Assert.assertEquals(10L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
"normal", DATASOURCE));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to