This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch 29.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/29.0.0 by this push:
     new 4b60c7eea74 Kill tasks honor the buffer period of unused segments 
(#15710) (#15811)
4b60c7eea74 is described below

commit 4b60c7eea74851d46d8fd17f8e87ed0b16c6448b
Author: Laksh Singla <[email protected]>
AuthorDate: Wed Jan 31 14:31:20 2024 +0530

    Kill tasks honor the buffer period of unused segments (#15710) (#15811)
    
    * Kill tasks should honor the buffer period of unused segments.
    
    - The coordinator duty KillUnusedSegments determines an umbrella interval
     for each datasource to determine the kill interval. There can be multiple 
unused
    segments in an umbrella interval with different used_status_last_updated 
timestamps.
    For example, consider an unused segment that is 30 days old and one that is 
1 hour old. Currently
    the kill task after the 30-day mark would kill both the unused segments and 
not retain the 1-hour
    old one.
    
    - However, when a kill task is instantiated with this umbrella interval, 
it’d kill
    all the unused segments regardless of the last updated timestamp. We need 
kill
    tasks and RetrieveUnusedSegmentsAction to honor the bufferPeriod to avoid 
killing
    unused segments in the kill interval prematurely.
    
    * Clarify default behavior in docs.
    
    * test comments
    
    * fix canDutyRun()
    
    * small updates.
    
    * checkstyle
    
    * forbidden api fix
    
    * doc fix, unused import, codeql scan error, and cleanup logs.
    
    * Address review comments
    
    * Rename maxUsedFlagLastUpdatedTime to maxUsedStatusLastUpdatedTime
    
    This is consistent with the column name `used_status_last_updated`.
    
    * Apply suggestions from code review
    
    
    
    * Make period Duration type
    
    * Remove older variants of runKilLTask() in OverlordClient interface
    
    * Test can now run without waiting for canDutyRun().
    
    * Remove previous variants of retrieveUnusedSegments from internal metadata 
storage coordinator interface.
    
    Removes the following interface methods in favor of a new method added:
    - retrieveUnusedSegmentsForInterval(String, Interval)
    - retrieveUnusedSegmentsForInterval(String, Interval, Integer)
    
    * Chain stream operations
    
    * cleanup
    
    * Pass in the lastUpdatedTime to markUnused test function and remove sleep.
    
    ---------
    
    Co-authored-by: Abhishek Radhakrishnan <[email protected]>
    Co-authored-by: Kashif Faraz <[email protected]>
---
 docs/data-management/delete.md                     |  10 +-
 docs/operations/clean-metadata-store.md            |  11 +-
 .../actions/RetrieveUnusedSegmentsAction.java      |  18 +-
 .../druid/indexing/common/task/ArchiveTask.java    |   2 +-
 .../common/task/KillUnusedSegmentsTask.java        |  39 +-
 .../druid/indexing/common/task/MoveTask.java       |   2 +-
 .../druid/indexing/common/task/RestoreTask.java    |   2 +-
 .../actions/RetrieveSegmentsActionsTest.java       |  20 +-
 ...ClientKillUnusedSegmentsTaskQuerySerdeTest.java |  37 +-
 .../common/task/KillUnusedSegmentsTaskTest.java    | 495 +++++++++++++++++++--
 .../druid/indexing/overlord/TaskLifecycleTest.java |  20 +-
 .../overlord/http/OverlordResourceTest.java        |   2 +-
 .../TestIndexerMetadataStorageCoordinator.java     |  28 +-
 .../ClientKillUnusedSegmentsTaskQuery.java         |  35 +-
 .../IndexerMetadataStorageCoordinator.java         |  16 +-
 .../IndexerSQLMetadataStorageCoordinator.java      |  22 +-
 .../druid/metadata/SegmentsMetadataManager.java    |  18 +-
 .../druid/metadata/SqlSegmentsMetadataManager.java |   9 +-
 .../druid/metadata/SqlSegmentsMetadataQuery.java   |  56 ++-
 .../apache/druid/rpc/indexing/OverlordClient.java  |  34 +-
 .../coordinator/duty/KillUnusedSegments.java       | 117 +++--
 .../druid/server/http/DataSourcesResource.java     |   2 +-
 .../ClientKillUnusedSegmentsTaskQueryTest.java     |   3 +-
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 213 +++++++--
 .../druid/rpc/indexing/OverlordClientImplTest.java |   1 +
 .../coordinator/duty/KillUnusedSegmentsTest.java   |  56 +--
 .../simulate/TestSegmentsMetadataManager.java      |   2 +-
 .../druid/server/http/DataSourcesResourceTest.java |   2 +-
 28 files changed, 980 insertions(+), 292 deletions(-)

diff --git a/docs/data-management/delete.md b/docs/data-management/delete.md
index 6acd2fc782b..fccb14007b9 100644
--- a/docs/data-management/delete.md
+++ b/docs/data-management/delete.md
@@ -95,9 +95,10 @@ The available grammar is:
     "id": <task_id>,
     "dataSource": <task_datasource>,
     "interval" : <all_unused_segments_in_this_interval_will_die!>,
-    "context": <task context>,
-    "batchSize": <optional_batch size>,
-    "limit": <the maximum number of segments to delete>
+    "context": <task_context>,
+    "batchSize": <optional_batch_size>,
+    "limit": <optional_maximum_number_of_segments_to_delete>,
+    "maxUsedStatusLastUpdatedTime": 
<optional_maximum_timestamp_when_segments_were_marked_as_unused>
 }
 ```
 
@@ -106,7 +107,8 @@ Some of the parameters used in the task payload are further 
explained below:
 | Parameter   | Default         | Explanation                                  
                                                                                
                                                                                
                                                                                
                                                                               |
 
|-------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | `batchSize`    |100    | Maximum number of segments that are deleted in one 
kill batch. Some operations on the Overlord may get stuck while a `kill` task 
is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus, 
a `kill` task splits the list of unused segments to be deleted into smaller 
batches to yield the Overlord resources intermittently to other task 
operations.|
-| `limit`     | null - no limit | Maximum number of segments for the kill task 
to delete.|
+| `limit`     | null (no limit) | Maximum number of segments for the kill task 
to delete.|
+| `maxUsedStatusLastUpdatedTime` | null (no cutoff) | Maximum timestamp used 
as a cutoff to include unused segments. The kill task only considers segments 
which lie in the specified `interval` and were marked as unused no later than 
this time. The default behavior is to kill all unused segments in the 
`interval` regardless of when they where marked as unused.|
 
 
 **WARNING:** The `kill` task permanently removes all information about the 
affected segments from the metadata store and
diff --git a/docs/operations/clean-metadata-store.md 
b/docs/operations/clean-metadata-store.md
index 202b27805ed..b30374123ee 100644
--- a/docs/operations/clean-metadata-store.md
+++ b/docs/operations/clean-metadata-store.md
@@ -86,6 +86,8 @@ Only applies to the specified datasources in the dynamic 
configuration parameter
 If `killDataSourceWhitelist` is not set or empty, then kill tasks can be 
submitted for all datasources.
 - `druid.coordinator.kill.period`: Defines the frequency in [ISO 8601 
format](https://en.wikipedia.org/wiki/ISO_8601#Durations) for the cleanup job 
to check for and delete eligible segments. Defaults to `P1D`. Must be greater 
than `druid.coordinator.period.indexingPeriod`. 
 - `druid.coordinator.kill.durationToRetain`: Defines the retention period in 
[ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) after 
creation that segments become eligible for deletion.
+- `druid.coordinator.kill.ignoreDurationToRetain`: A way to override 
`druid.coordinator.kill.durationToRetain`. When enabled, the coordinator 
considers all unused segments as eligible to be killed.
+- `druid.coordinator.kill.bufferPeriod`: Defines the amount of time that a 
segment must be unused before it can be permanently removed from metadata and 
deep storage. This serves as a buffer period to prevent data loss if data ends 
up being needed after being marked unused.
 - `druid.coordinator.kill.maxSegments`: Defines the maximum number of segments 
to delete per kill task.
 
 ### Audit records
@@ -189,15 +191,15 @@ druid.coordinator.kill.datasource.on=false
 <a name="example"></a>
 ## Example configuration for automated metadata cleanup
 
-Consider a scenario where you have scripts to create and delete hundreds of 
datasources and related entities a day. You do not want to fill your metadata 
store with leftover records. The datasources and related entities tend to 
persist for only one or two days. Therefore, you want to run a cleanup job that 
identifies and removes leftover records that are at least four days old. The 
exception is for audit logs, which you need to retain for 30 days:
+Consider a scenario where you have scripts to create and delete hundreds of 
datasources and related entities a day. You do not want to fill your metadata 
store with leftover records. The datasources and related entities tend to 
persist for only one or two days. Therefore, you want to run a cleanup job that 
identifies and removes leftover records that are at least four days old after a 
seven day buffer period in case you want to recover the data. The exception is 
for audit logs, which you [...]
 
 ```properties
 ...
 # Schedule the metadata management store task for every hour:
-druid.coordinator.period.metadataStoreManagementPeriod=P1H
+druid.coordinator.period.metadataStoreManagementPeriod=PT1H
 
-# Set a kill task to poll every day to delete Segment records and segments
-# in deep storage > 4 days old. When druid.coordinator.kill.on is set to true,
+# Set a kill task to poll every day to delete segment records and segments
+# in deep storage > 4 days old after a 7-day buffer period. When 
druid.coordinator.kill.on is set to true,
 # you can set killDataSourceWhitelist in the dynamic configuration to limit
 # the datasources that can be killed.
 # Required also for automated cleanup of rules and compaction configuration.
@@ -205,6 +207,7 @@ druid.coordinator.period.metadataStoreManagementPeriod=P1H
 druid.coordinator.kill.on=true
 druid.coordinator.kill.period=P1D
 druid.coordinator.kill.durationToRetain=P4D
+druid.coordinator.kill.bufferPeriod=P7D
 druid.coordinator.kill.maxSegments=1000
 
 # Poll every day to delete audit records > 30 days old
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
index 150648858c1..bb188952966 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -42,16 +43,21 @@ public class RetrieveUnusedSegmentsAction implements 
TaskAction<List<DataSegment
   @JsonIgnore
   private final Integer limit;
 
+  @JsonIgnore
+  private final DateTime maxUsedStatusLastUpdatedTime;
+
   @JsonCreator
   public RetrieveUnusedSegmentsAction(
       @JsonProperty("dataSource") String dataSource,
       @JsonProperty("interval") Interval interval,
-      @JsonProperty("limit") @Nullable Integer limit
+      @JsonProperty("limit") @Nullable Integer limit,
+      @JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime 
maxUsedStatusLastUpdatedTime
   )
   {
     this.dataSource = dataSource;
     this.interval = interval;
     this.limit = limit;
+    this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
   }
 
   @JsonProperty
@@ -73,6 +79,13 @@ public class RetrieveUnusedSegmentsAction implements 
TaskAction<List<DataSegment
     return limit;
   }
 
+  @Nullable
+  @JsonProperty
+  public DateTime getMaxUsedStatusLastUpdatedTime()
+  {
+    return maxUsedStatusLastUpdatedTime;
+  }
+
   @Override
   public TypeReference<List<DataSegment>> getReturnTypeReference()
   {
@@ -83,7 +96,7 @@ public class RetrieveUnusedSegmentsAction implements 
TaskAction<List<DataSegment
   public List<DataSegment> perform(Task task, TaskActionToolbox toolbox)
   {
     return toolbox.getIndexerMetadataStorageCoordinator()
-        .retrieveUnusedSegmentsForInterval(dataSource, interval, limit);
+        .retrieveUnusedSegmentsForInterval(dataSource, interval, limit, 
maxUsedStatusLastUpdatedTime);
   }
 
   @Override
@@ -99,6 +112,7 @@ public class RetrieveUnusedSegmentsAction implements 
TaskAction<List<DataSegment
            "dataSource='" + dataSource + '\'' +
            ", interval=" + interval +
            ", limit=" + limit +
+           ", maxUsedStatusLastUpdatedTime=" + maxUsedStatusLastUpdatedTime +
            '}';
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
index 42d04316a41..957317ad93c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
@@ -79,7 +79,7 @@ public class ArchiveTask extends AbstractFixedIntervalTask
     // List unused segments
     final List<DataSegment> unusedSegments = toolbox
         .getTaskActionClient()
-        .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), 
myLock.getInterval(), null));
+        .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), 
myLock.getInterval(), null, null));
 
     // Verify none of these segments have versions > lock version
     for (final DataSegment unusedSegment : unusedSegments) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
index 54fae94684f..cbf4a84ba79 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
@@ -63,9 +63,10 @@ import java.util.stream.Collectors;
 /**
  * The client representation of this task is {@link 
ClientKillUnusedSegmentsTaskQuery}.
  * JSON serialization fields of this class must correspond to those of {@link
- * ClientKillUnusedSegmentsTaskQuery}, except for "id" and "context" fields.
+ * ClientKillUnusedSegmentsTaskQuery}, except for {@link #id} and {@link 
#context} fields.
  * <p>
  * The field {@link #isMarkAsUnused()} is now deprecated.
+ * </p>
  */
 public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
 {
@@ -95,6 +96,12 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
    */
   @Nullable private final Integer limit;
 
+  /**
+   * The maximum used status last updated time. Any segments with
+   * {@code used_status_last_updated} no later than this time will be included 
in the kill task.
+   */
+  @Nullable private final DateTime maxUsedStatusLastUpdatedTime;
+
   @JsonCreator
   public KillUnusedSegmentsTask(
       @JsonProperty("id") String id,
@@ -103,7 +110,8 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
       @JsonProperty("context") Map<String, Object> context,
       @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
       @JsonProperty("batchSize") Integer batchSize,
-      @JsonProperty("limit") @Nullable Integer limit
+      @JsonProperty("limit") @Nullable Integer limit,
+      @JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime 
maxUsedStatusLastUpdatedTime
   )
   {
     super(
@@ -115,15 +123,16 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
     this.markAsUnused = markAsUnused != null && markAsUnused;
     this.batchSize = (batchSize != null) ? batchSize : 
DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
     if (this.batchSize <= 0) {
-      throw InvalidInput.exception("batchSize[%d] must be a positive 
integer.", limit);
+      throw InvalidInput.exception("batchSize[%d] must be a positive 
integer.", batchSize);
     }
     if (limit != null && limit <= 0) {
-      throw InvalidInput.exception("Limit[%d] must be a positive integer.", 
limit);
+      throw InvalidInput.exception("limit[%d] must be a positive integer.", 
limit);
     }
     if (limit != null && Boolean.TRUE.equals(markAsUnused)) {
-      throw InvalidInput.exception("Limit cannot be provided when markAsUnused 
is enabled.");
+      throw InvalidInput.exception("limit[%d] cannot be provided when 
markAsUnused is enabled.", limit);
     }
     this.limit = limit;
+    this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
   }
 
   /**
@@ -155,6 +164,13 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
     return limit;
   }
 
+  @Nullable
+  @JsonProperty
+  public DateTime getMaxUsedStatusLastUpdatedTime()
+  {
+    return maxUsedStatusLastUpdatedTime;
+  }
+
   @Override
   public String getType()
   {
@@ -180,7 +196,8 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
       numSegmentsMarkedAsUnused = toolbox.getTaskActionClient().submit(
           new MarkSegmentsAsUnusedAction(getDataSource(), getInterval())
       );
-      LOG.info("Marked [%d] segments as unused.", numSegmentsMarkedAsUnused);
+      LOG.info("Marked [%d] segments of datasource[%s] in interval[%s] as 
unused.",
+               numSegmentsMarkedAsUnused, getDataSource(), getInterval());
     } else {
       numSegmentsMarkedAsUnused = 0;
     }
@@ -190,9 +207,13 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
     @Nullable Integer numTotalBatches = getNumTotalBatches();
     List<DataSegment> unusedSegments;
     LOG.info(
-        "Starting kill with batchSize[%d], up to limit[%d] segments will be 
deleted%s",
+        "Starting kill for datasource[%s] in interval[%s] with batchSize[%d], 
up to limit[%d] segments "
+        + "before maxUsedStatusLastUpdatedTime[%s] will be deleted%s",
+        getDataSource(),
+        getInterval(),
         batchSize,
         limit,
+        maxUsedStatusLastUpdatedTime,
         numTotalBatches != null ? StringUtils.format(" in [%d] batches.", 
numTotalBatches) : "."
     );
 
@@ -217,7 +238,9 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
 
       unusedSegments = toolbox
               .getTaskActionClient()
-              .submit(new RetrieveUnusedSegmentsAction(getDataSource(), 
getInterval(), nextBatchSize));
+              .submit(
+                  new RetrieveUnusedSegmentsAction(getDataSource(), 
getInterval(), nextBatchSize, maxUsedStatusLastUpdatedTime
+              ));
 
       // Fetch locks each time as a revokal could have occurred in between 
batches
       final NavigableMap<DateTime, List<TaskLock>> taskLockMap
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
index d23b3820db7..bea3685ca24 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
@@ -87,7 +87,7 @@ public class MoveTask extends AbstractFixedIntervalTask
     // List unused segments
     final List<DataSegment> unusedSegments = toolbox
         .getTaskActionClient()
-        .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), 
myLock.getInterval(), null));
+        .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), 
myLock.getInterval(), null, null));
 
     // Verify none of these segments have versions > lock version
     for (final DataSegment unusedSegment : unusedSegments) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
index 1364bcb597f..cc635383fed 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
@@ -80,7 +80,7 @@ public class RestoreTask extends AbstractFixedIntervalTask
     // List unused segments
     final List<DataSegment> unusedSegments = toolbox
         .getTaskActionClient()
-        .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), 
myLock.getInterval(), null));
+        .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), 
myLock.getInterval(), null, null));
 
     // Verify none of these segments have versions > lock version
     for (final DataSegment unusedSegment : unusedSegments) {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
index 09f724bcfc1..2dee594aad6 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
@@ -20,8 +20,10 @@
 package org.apache.druid.indexing.common.actions;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
@@ -103,7 +105,23 @@ public class RetrieveSegmentsActionsTest
   @Test
   public void testRetrieveUnusedSegmentsAction()
   {
-    final RetrieveUnusedSegmentsAction action = new 
RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null);
+    final RetrieveUnusedSegmentsAction action = new 
RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, null);
+    final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, 
actionTestKit.getTaskActionToolbox()));
+    Assert.assertEquals(expectedUnusedSegments, resultSegments);
+  }
+
+  @Test
+  public void testRetrieveUnusedSegmentsActionWithMinUsedLastUpdatedTime()
+  {
+    final RetrieveUnusedSegmentsAction action = new 
RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, 
DateTimes.MIN);
+    final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, 
actionTestKit.getTaskActionToolbox()));
+    Assert.assertEquals(ImmutableSet.of(), resultSegments);
+  }
+
+  @Test
+  public void testRetrieveUnusedSegmentsActionWithNowUsedLastUpdatedTime()
+  {
+    final RetrieveUnusedSegmentsAction action = new 
RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, 
DateTimes.nowUtc());
     final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, 
actionTestKit.getTaskActionToolbox()));
     Assert.assertEquals(expectedUnusedSegments, resultSegments);
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
index 3ab6bae4688..7e7c9088d61 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
 import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
 import org.apache.druid.client.indexing.ClientTaskQuery;
 import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.junit.Assert;
 import org.junit.Before;
@@ -53,7 +54,8 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
         Intervals.of("2020-01-01/P1D"),
         false,
         99,
-        5
+        5,
+        DateTimes.nowUtc()
     );
     final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
     final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) 
objectMapper.readValue(json, Task.class);
@@ -63,7 +65,7 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
     Assert.assertEquals(taskQuery.getMarkAsUnused(), 
fromJson.isMarkAsUnused());
     Assert.assertEquals(taskQuery.getBatchSize(), 
Integer.valueOf(fromJson.getBatchSize()));
     Assert.assertEquals(taskQuery.getLimit(), fromJson.getLimit());
-
+    Assert.assertEquals(taskQuery.getMaxUsedStatusLastUpdatedTime(), 
fromJson.getMaxUsedStatusLastUpdatedTime());
   }
 
   @Test
@@ -75,6 +77,7 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
             Intervals.of("2020-01-01/P1D"),
             true,
             null,
+            null,
             null
     );
     final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
@@ -85,6 +88,7 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
     Assert.assertEquals(taskQuery.getMarkAsUnused(), 
fromJson.isMarkAsUnused());
     Assert.assertEquals(100, fromJson.getBatchSize());
     Assert.assertNull(taskQuery.getLimit());
+    Assert.assertNull(taskQuery.getMaxUsedStatusLastUpdatedTime());
   }
 
   @Test
@@ -97,6 +101,7 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
         null,
         true,
         99,
+        null,
         null
     );
     final byte[] json = objectMapper.writeValueAsBytes(task);
@@ -110,5 +115,33 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
     Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
     Assert.assertEquals(Integer.valueOf(task.getBatchSize()), 
taskQuery.getBatchSize());
     Assert.assertNull(task.getLimit());
+    Assert.assertNull(task.getMaxUsedStatusLastUpdatedTime());
+  }
+
+  @Test
+  public void 
testKillUnusedSegmentsTaskWithNonNullValuesToClientKillUnusedSegmentsTaskQuery()
 throws IOException
+  {
+    final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask(
+        null,
+        "datasource",
+        Intervals.of("2020-01-01/P1D"),
+        null,
+        null,
+        99,
+        100,
+        DateTimes.nowUtc()
+    );
+    final byte[] json = objectMapper.writeValueAsBytes(task);
+    final ClientKillUnusedSegmentsTaskQuery taskQuery = 
(ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue(
+        json,
+        ClientTaskQuery.class
+    );
+    Assert.assertEquals(task.getId(), taskQuery.getId());
+    Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
+    Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
+    Assert.assertNull(taskQuery.getMarkAsUnused());
+    Assert.assertEquals(Integer.valueOf(task.getBatchSize()), 
taskQuery.getBatchSize());
+    Assert.assertEquals(task.getLimit(), taskQuery.getLimit());
+    Assert.assertEquals(task.getMaxUsedStatusLastUpdatedTime(), 
taskQuery.getMaxUsedStatusLastUpdatedTime());
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
index d77f2e6c243..e2c433536a2 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
@@ -22,6 +22,8 @@ package org.apache.druid.indexing.common.task;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexing.common.KillTaskReport;
 import org.apache.druid.indexing.common.TaskReport;
@@ -29,8 +31,11 @@ import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.timeline.DataSegment;
 import org.assertj.core.api.Assertions;
+import org.hamcrest.MatcherAssert;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Before;
@@ -40,6 +45,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class KillUnusedSegmentsTaskTest extends IngestionTestBase
 {
@@ -86,18 +92,24 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
             null,
             false,
             null,
+            null,
             null
         );
 
     Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task).get().getStatusCode());
 
-    final List<DataSegment> unusedSegments =
-        
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, 
Intervals.of("2019/2020"));
+    final List<DataSegment> unusedSegments = 
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+            DATA_SOURCE,
+            Intervals.of("2019/2020"),
+            null,
+            null
+        );
 
     
Assert.assertEquals(ImmutableList.of(newSegment(Intervals.of("2019-02-01/2019-03-01"),
 version)), unusedSegments);
-    Assertions.assertThat(
-        getMetadataStorageCoordinator()
-            .retrieveUsedSegmentsForInterval(DATA_SOURCE, 
Intervals.of("2019/2020"), Segments.ONLY_VISIBLE)
+    
Assertions.assertThat(getMetadataStorageCoordinator().retrieveUsedSegmentsForInterval(
+        DATA_SOURCE,
+        Intervals.of("2019/2020"),
+        Segments.ONLY_VISIBLE)
     ).containsExactlyInAnyOrder(
         newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
         newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
@@ -135,13 +147,19 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
             null,
             true,
             null,
+            null,
             null
         );
 
     Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task).get().getStatusCode());
 
     final List<DataSegment> unusedSegments =
-        
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, 
Intervals.of("2019/2020"));
+        getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+            DATA_SOURCE,
+            Intervals.of("2019/2020"),
+            null,
+            null
+        );
 
     
Assert.assertEquals(ImmutableList.of(newSegment(Intervals.of("2019-02-01/2019-03-01"),
 version)), unusedSegments);
     Assertions.assertThat(
@@ -166,6 +184,7 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
             null,
             true,
             null,
+            null,
             null
         );
     Assert.assertTrue(task.getInputSourceResources().isEmpty());
@@ -176,10 +195,10 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
   {
     final String version = DateTimes.nowUtc().toString();
     final Set<DataSegment> segments = ImmutableSet.of(
-            newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
-            newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
-            newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
-            newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
+        newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
+        newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
+        newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
+        newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
     );
     final Set<DataSegment> announced = 
getMetadataStorageCoordinator().commitSegments(segments);
 
@@ -194,58 +213,374 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
     );
 
     final KillUnusedSegmentsTask task =
-            new KillUnusedSegmentsTask(
-                    null,
-                    DATA_SOURCE,
-                    Intervals.of("2018-01-01/2020-01-01"),
-                    null,
-                    false,
-                    1,
-                4
-            );
+        new KillUnusedSegmentsTask(
+            null,
+            DATA_SOURCE,
+            Intervals.of("2018-01-01/2020-01-01"),
+            null,
+            false,
+            1,
+            4,
+            null
+        );
 
     Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task).get().getStatusCode());
 
     // we expect ALL tasks to be deleted
 
     final List<DataSegment> unusedSegments =
-            
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, 
Intervals.of("2019/2020"));
+        getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+            DATA_SOURCE,
+            Intervals.of("2019/2020"),
+            null,
+            null
+        );
 
     Assert.assertEquals(Collections.emptyList(), unusedSegments);
     Assert.assertEquals(new KillTaskReport.Stats(4, 4, 0), getReportedStats());
   }
 
+  /**
+   * Test kill functionality of multiple unused segments in a wide interval 
with different {@code used_status_last_updated}
+   * timestamps. A kill task submitted with null {@code 
maxUsedStatusLastUpdatedTime} will kill all the unused segments in the kill
+   * interval.
+   */
+  @Test
+  public void 
testKillMultipleUnusedSegmentsWithNullMaxUsedStatusLastUpdatedTime() throws 
Exception
+  {
+    final String version = DateTimes.nowUtc().toString();
+    final DataSegment segment1 = 
newSegment(Intervals.of("2019-01-01/2019-02-01"), version);
+    final DataSegment segment2 = 
newSegment(Intervals.of("2019-02-01/2019-03-01"), version);
+    final DataSegment segment3 = 
newSegment(Intervals.of("2019-03-01/2019-04-01"), version);
+    final DataSegment segment4 = 
newSegment(Intervals.of("2019-04-01/2019-05-01"), version);
+
+    final Set<DataSegment> segments = ImmutableSet.of(segment1, segment2, 
segment3, segment4);
+    final Set<DataSegment> announced = 
getMetadataStorageCoordinator().commitSegments(segments);
+
+    Assert.assertEquals(segments, announced);
+
+    Assert.assertEquals(
+        1,
+        getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+            DATA_SOURCE,
+            segment1.getInterval()
+        )
+    );
+
+    Assert.assertEquals(
+        1,
+        getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+            DATA_SOURCE,
+            segment4.getInterval()
+        )
+    );
+
+    Assert.assertEquals(
+        1,
+        getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+            DATA_SOURCE,
+            segment3.getInterval()
+        )
+    );
+
+    final List<Interval> segmentIntervals = segments.stream()
+                                                    
.map(DataSegment::getInterval)
+                                                    
.collect(Collectors.toList());
+
+    final Interval umbrellaInterval = 
JodaUtils.umbrellaInterval(segmentIntervals);
+
+
+    final KillUnusedSegmentsTask task =
+        new KillUnusedSegmentsTask(
+            null,
+            DATA_SOURCE,
+            umbrellaInterval,
+            null,
+            false,
+            1,
+            10,
+            null
+        );
+
+    Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task).get().getStatusCode());
+
+    final List<DataSegment> unusedSegments =
+        getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+            DATA_SOURCE,
+            umbrellaInterval,
+            null,
+            null
+        );
+
+    Assert.assertEquals(ImmutableList.of(), unusedSegments);
+    Assert.assertEquals(new KillTaskReport.Stats(3, 4, 0), getReportedStats());
+  }
+
+  /**
+   * Test kill functionality of multiple unused segments in a wide interval 
with different {@code used_status_last_updated}
+   * timestamps. Consider:
+   * <li> {@code segment1}, {@code segment2} and {@code segment3} have t1, t2 
and t3 {@code used_status_last_updated} timestamps
+   * respectively, where  t1 < t2 < t3 </li>
+   * <li> {@code segment4} is a used segment and therefore shouldn't be killed 
</li>
+   *
+   * <p>
+   * A kill task submitted with t2 as the {@code maxUsedStatusLastUpdatedTime} 
should only kill {@code segment1} and {@code segment2}
+   * After that, a kill task submitted with t3 as the {@code 
maxUsedStatusLastUpdatedTime} should kill {@code segment3}.
+   * </p>
+   */
+  @Test
+  public void 
testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedTime() 
throws Exception
+  {
+    final String version = DateTimes.nowUtc().toString();
+    final DataSegment segment1 = 
newSegment(Intervals.of("2019-01-01/2019-02-01"), version);
+    final DataSegment segment2 = 
newSegment(Intervals.of("2019-02-01/2019-03-01"), version);
+    final DataSegment segment3 = 
newSegment(Intervals.of("2019-03-01/2019-04-01"), version);
+    final DataSegment segment4 = 
newSegment(Intervals.of("2019-04-01/2019-05-01"), version);
+
+    final Set<DataSegment> segments = ImmutableSet.of(segment1, segment2, 
segment3, segment4);
+    final Set<DataSegment> announced = 
getMetadataStorageCoordinator().commitSegments(segments);
+
+    Assert.assertEquals(segments, announced);
+
+    Assert.assertEquals(
+        1,
+        getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+            DATA_SOURCE,
+            segment1.getInterval()
+        )
+    );
+
+    Assert.assertEquals(
+        1,
+        getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+            DATA_SOURCE,
+            segment4.getInterval()
+        )
+    );
+
+    // Capture the last updated time cutoff
+    final DateTime maxUsedStatusLastUpdatedTime1 = DateTimes.nowUtc();
+
+    // Delay for 1s, mark the segments as unused and then capture the last 
updated time cutoff again
+    Thread.sleep(1000);
+
+    // now mark the third segment as unused
+    Assert.assertEquals(
+        1,
+        getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+            DATA_SOURCE,
+            segment3.getInterval()
+        )
+    );
+
+    final DateTime maxUsedStatusLastUpdatedTime2 = DateTimes.nowUtc();
+
+
+    final List<Interval> segmentIntervals = segments.stream()
+                                                    
.map(DataSegment::getInterval)
+                                                    
.collect(Collectors.toList());
+
+    final Interval umbrellaInterval = 
JodaUtils.umbrellaInterval(segmentIntervals);
+
+    final KillUnusedSegmentsTask task1 =
+        new KillUnusedSegmentsTask(
+            null,
+            DATA_SOURCE,
+            umbrellaInterval,
+            null,
+            false,
+            1,
+            10,
+            maxUsedStatusLastUpdatedTime1
+        );
+
+    Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task1).get().getStatusCode());
+
+    final List<DataSegment> unusedSegments =
+        getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+            DATA_SOURCE,
+            umbrellaInterval,
+            null,
+            null
+        );
+
+    Assert.assertEquals(ImmutableList.of(segment3), unusedSegments);
+    Assert.assertEquals(new KillTaskReport.Stats(2, 3, 0), getReportedStats());
+
+    final KillUnusedSegmentsTask task2 =
+        new KillUnusedSegmentsTask(
+            null,
+            DATA_SOURCE,
+            umbrellaInterval,
+            null,
+            false,
+            1,
+            10,
+            maxUsedStatusLastUpdatedTime2
+        );
+
+    Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task2).get().getStatusCode());
+
+    final List<DataSegment> unusedSegments2 =
+        getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+            DATA_SOURCE,
+            umbrellaInterval,
+            null,
+            null
+        );
+
+    Assert.assertEquals(ImmutableList.of(), unusedSegments2);
+    Assert.assertEquals(new KillTaskReport.Stats(1, 2, 0), getReportedStats());
+  }
+
+  /**
+   * Similar to {@link 
#testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedTime()}}, 
but with a different setup.
+   * <p>
+   * Tests kill functionality of multiple unused segments in a wide interval 
with different {@code used_status_last_updated}
+   * timestamps. Consider:
+   * <li> {@code segment1} and {@code segment4} have t1 {@code 
used_status_last_updated} timestamp
+   * <li> {@code segment2} and {@code segment3} have t2 {@code 
used_status_last_updated} timestamp, where t1 < t2 </li>
+   *
+   * <p>
+   * A kill task submitted with t1 as the {@code maxUsedStatusLastUpdatedTime} 
should only kill {@code segment1} and {@code segment4}
+   * After that, a kill task submitted with t2 as the {@code 
maxUsedStatusLastUpdatedTime} should kill {@code segment2} and {@code segment3}.
+   * </p>
+   */
+  @Test
+  public void 
testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedTime2() 
throws Exception
+  {
+    final String version = DateTimes.nowUtc().toString();
+    final DataSegment segment1 = 
newSegment(Intervals.of("2019-01-01/2019-02-01"), version);
+    final DataSegment segment2 = 
newSegment(Intervals.of("2019-02-01/2019-03-01"), version);
+    final DataSegment segment3 = 
newSegment(Intervals.of("2019-03-01/2019-04-01"), version);
+    final DataSegment segment4 = 
newSegment(Intervals.of("2019-04-01/2019-05-01"), version);
+
+    final Set<DataSegment> segments = ImmutableSet.of(segment1, segment2, 
segment3, segment4);
+    final Set<DataSegment> announced = 
getMetadataStorageCoordinator().commitSegments(segments);
+
+    Assert.assertEquals(segments, announced);
+
+    Assert.assertEquals(
+        2,
+        getSegmentsMetadataManager().markSegmentsAsUnused(
+            ImmutableSet.of(
+                segment1.getId(),
+                segment4.getId()
+            )
+        )
+    );
+
+    final DateTime maxUsedStatusLastUpdatedTime1 = DateTimes.nowUtc();
+
+    // Delay for 1s, mark the segments as unused and then capture the last 
updated time cutoff again
+    Thread.sleep(1000);
+
+    Assert.assertEquals(
+        2,
+        getSegmentsMetadataManager().markSegmentsAsUnused(
+            ImmutableSet.of(
+                segment2.getId(),
+                segment3.getId()
+            )
+        )
+    );
+
+    final DateTime maxUsedStatusLastUpdatedTime2 = DateTimes.nowUtc();
+
+
+    final List<Interval> segmentIntervals = segments.stream()
+                                                    
.map(DataSegment::getInterval)
+                                                    
.collect(Collectors.toList());
+
+    final Interval umbrellaInterval = 
JodaUtils.umbrellaInterval(segmentIntervals);
+
+
+    final KillUnusedSegmentsTask task1 =
+        new KillUnusedSegmentsTask(
+            null,
+            DATA_SOURCE,
+            umbrellaInterval,
+            null,
+            false,
+            1,
+            10,
+            maxUsedStatusLastUpdatedTime1
+        );
+
+    Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task1).get().getStatusCode());
+
+    final List<DataSegment> unusedSegments =
+        getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+                DATA_SOURCE,
+                umbrellaInterval,
+                null,
+                null
+            );
+
+    Assert.assertEquals(ImmutableList.of(segment2, segment3), unusedSegments);
+    Assert.assertEquals(new KillTaskReport.Stats(2, 3, 0), getReportedStats());
+
+    final KillUnusedSegmentsTask task2 =
+        new KillUnusedSegmentsTask(
+            null,
+            DATA_SOURCE,
+            umbrellaInterval,
+            null,
+            false,
+            1,
+            10,
+            maxUsedStatusLastUpdatedTime2
+        );
+
+    Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task2).get().getStatusCode());
+
+    final List<DataSegment> unusedSegments2 = 
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+            DATA_SOURCE,
+            umbrellaInterval,
+            null,
+            null
+        );
+
+    Assert.assertEquals(ImmutableList.of(), unusedSegments2);
+    Assert.assertEquals(new KillTaskReport.Stats(2, 3, 0), getReportedStats());
+  }
+
   @Test
   public void testKillBatchSizeThree() throws Exception
   {
     final String version = DateTimes.nowUtc().toString();
     final Set<DataSegment> segments = ImmutableSet.of(
-            newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
-            newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
-            newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
-            newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
+        newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
+        newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
+        newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
+        newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
     );
     final Set<DataSegment> announced = 
getMetadataStorageCoordinator().commitSegments(segments);
 
     Assert.assertEquals(segments, announced);
 
     final KillUnusedSegmentsTask task =
-            new KillUnusedSegmentsTask(
-                    null,
-                    DATA_SOURCE,
-                    Intervals.of("2018-01-01/2020-01-01"),
-                    null,
-                    true,
-                    3,
-                null
-            );
+        new KillUnusedSegmentsTask(
+            null,
+            DATA_SOURCE,
+            Intervals.of("2018-01-01/2020-01-01"),
+            null,
+            true,
+            3,
+            null,
+            null
+        );
 
     Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task).get().getStatusCode());
 
     // we expect ALL tasks to be deleted
 
-    final List<DataSegment> unusedSegments =
-            
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, 
Intervals.of("2019/2020"));
+    final List<DataSegment> unusedSegments = 
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+            DATA_SOURCE,
+            Intervals.of("2019/2020"),
+            null,
+            null
+        );
 
     Assert.assertEquals(Collections.emptyList(), unusedSegments);
 
@@ -263,6 +598,7 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
             null,
             false,
             null,
+            null,
             null
         );
     Assert.assertEquals(100, task.computeNextBatchSize(50));
@@ -279,7 +615,8 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
             null,
             false,
             10,
-            5
+            5,
+            null
         );
     Assert.assertEquals(5, task.computeNextBatchSize(0));
   }
@@ -295,7 +632,8 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
             null,
             false,
             5,
-            10
+            10,
+            null
         );
     Assert.assertEquals(5, task.computeNextBatchSize(0));
   }
@@ -311,7 +649,8 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
             null,
             false,
             5,
-            10
+            10,
+            null
         );
     Assert.assertEquals(3, task.computeNextBatchSize(7));
   }
@@ -327,6 +666,7 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
             null,
             false,
             null,
+            null,
             null
         );
     Assert.assertNull(task.getNumTotalBatches());
@@ -343,11 +683,81 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
             null,
             false,
             10,
-            5
+            5,
+            null
         );
     Assert.assertEquals(1, (int) task.getNumTotalBatches());
   }
 
+  @Test
+  public void testInvalidLimit()
+  {
+    MatcherAssert.assertThat(
+        Assert.assertThrows(
+            DruidException.class,
+            () -> new KillUnusedSegmentsTask(
+                null,
+                DATA_SOURCE,
+                Intervals.of("2018-01-01/2020-01-01"),
+                null,
+                false,
+                10,
+                0,
+                null
+            )
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageIs(
+            "limit[0] must be a positive integer."
+        )
+    );
+  }
+
+  @Test
+  public void testInvalidBatchSize()
+  {
+    MatcherAssert.assertThat(
+        Assert.assertThrows(
+            DruidException.class,
+            () -> new KillUnusedSegmentsTask(
+                null,
+                DATA_SOURCE,
+                Intervals.of("2018-01-01/2020-01-01"),
+                null,
+                false,
+                0,
+                10,
+                null
+            )
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageIs(
+            "batchSize[0] must be a positive integer."
+        )
+    );
+  }
+
+  @Test
+  public void testInvalidMarkAsUnusedWithLimit()
+  {
+    MatcherAssert.assertThat(
+        Assert.assertThrows(
+            DruidException.class,
+            () -> new KillUnusedSegmentsTask(
+                null,
+                DATA_SOURCE,
+                Intervals.of("2018-01-01/2020-01-01"),
+                null,
+                true,
+                10,
+                10,
+                null
+            )
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageIs(
+            "limit[10] cannot be provided when markAsUnused is enabled."
+        )
+    );
+  }
+
   @Test
   public void testGetNumTotalBatchesWithBatchSizeSmallerThanLimit()
   {
@@ -359,7 +769,8 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
             null,
             false,
             5,
-            10
+            10,
+            null
         );
     Assert.assertEquals(2, (int) task.getNumTotalBatches());
   }
@@ -387,7 +798,9 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
     try {
       Object payload = getObjectMapper().readValue(
           taskRunner.getTaskReportsFile(),
-          new TypeReference<Map<String, TaskReport>>() { }
+          new TypeReference<Map<String, TaskReport>>()
+          {
+          }
       ).get(KillTaskReport.REPORT_KEY).getPayload();
       return getObjectMapper().convertValue(payload, 
KillTaskReport.Stats.class);
     }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 94d21c144dc..002c70262cb 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -903,7 +903,13 @@ public class TaskLifecycleTest extends 
InitializedNullHandlingTest
 
     // manually create local segments files
     List<File> segmentFiles = new ArrayList<>();
-    for (DataSegment segment : 
mdc.retrieveUnusedSegmentsForInterval("test_kill_task", 
Intervals.of("2011-04-01/P4D"))) {
+    final List<DataSegment> unusedSegments = 
mdc.retrieveUnusedSegmentsForInterval(
+        "test_kill_task",
+        Intervals.of("2011-04-01/P4D"),
+        null,
+        null
+    );
+    for (DataSegment segment : unusedSegments) {
       File file = new File((String) segment.getLoadSpec().get("path"));
       FileUtils.mkdirp(file.getParentFile());
       Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY);
@@ -918,6 +924,7 @@ public class TaskLifecycleTest extends 
InitializedNullHandlingTest
             null,
             false,
             null,
+            null,
             null
         );
 
@@ -993,7 +1000,13 @@ public class TaskLifecycleTest extends 
InitializedNullHandlingTest
 
     // manually create local segments files
     List<File> segmentFiles = new ArrayList<>();
-    for (DataSegment segment : 
mdc.retrieveUnusedSegmentsForInterval("test_kill_task", 
Intervals.of("2011-04-01/P4D"))) {
+    final List<DataSegment> unusedSegments = 
mdc.retrieveUnusedSegmentsForInterval(
+        "test_kill_task",
+        Intervals.of("2011-04-01/P4D"),
+        null,
+        null
+    );
+    for (DataSegment segment : unusedSegments) {
       File file = new File((String) segment.getLoadSpec().get("path"));
       FileUtils.mkdirp(file.getParentFile());
       Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY);
@@ -1009,7 +1022,8 @@ public class TaskLifecycleTest extends 
InitializedNullHandlingTest
             null,
             false,
             null,
-            maxSegmentsToKill
+            maxSegmentsToKill,
+            null
         );
 
     final TaskStatus status = runTask(killUnusedSegmentsTask);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index cdaf5b9b359..6897e69c26b 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -926,7 +926,7 @@ public class OverlordResourceTest
         auditManager
     );
 
-    Task task = new KillUnusedSegmentsTask("kill_all", "allow", 
Intervals.ETERNITY, null, false, 10, null);
+    Task task = new KillUnusedSegmentsTask("kill_all", "allow", 
Intervals.ETERNITY, null, false, 10, null, null);
     overlordResource.taskPost(task, req);
 
     Assert.assertTrue(auditEntryCapture.hasCaptured());
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index 2fc80adceac..f42a300de5f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -34,6 +34,7 @@ import org.apache.druid.metadata.ReplaceTaskLock;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.PartialShardSpec;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -43,7 +44,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Stream;
 
 public class TestIndexerMetadataStorageCoordinator implements 
IndexerMetadataStorageCoordinator
 {
@@ -104,31 +104,21 @@ public class TestIndexerMetadataStorageCoordinator 
implements IndexerMetadataSto
     return ImmutableList.of();
   }
 
-  @Override
-  public List<DataSegment> retrieveUnusedSegmentsForInterval(String 
dataSource, Interval interval)
-  {
-    synchronized (unusedSegments) {
-      return ImmutableList.copyOf(unusedSegments);
-    }
-  }
-
   @Override
   public List<DataSegment> retrieveUnusedSegmentsForInterval(
       String dataSource,
       Interval interval,
-      @Nullable Integer limit
+      @Nullable Integer limit,
+      @Nullable DateTime maxUsedStatusLastUpdatedTime
   )
   {
     synchronized (unusedSegments) {
-      Stream<DataSegment> resultStream = unusedSegments.stream();
-
-      resultStream = resultStream.filter(ds -> !nuked.contains(ds));
-
-      if (limit != null) {
-        resultStream = resultStream.limit(limit);
-      }
-
-      return ImmutableList.copyOf(resultStream.iterator());
+      return ImmutableList.copyOf(
+          unusedSegments.stream()
+                        .filter(ds -> !nuked.contains(ds))
+                        .limit(limit != null ? limit : Long.MAX_VALUE)
+                        .iterator()
+      );
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
index 279c6699ff9..e5656ff3975 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
@@ -21,16 +21,17 @@ package org.apache.druid.client.indexing;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
+import org.apache.druid.error.InvalidInput;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.util.Objects;
 
 /**
- * Client representation of 
org.apache.druid.indexing.common.task.KillUnusedSegmentsTask. JSON 
searialization
- * fields of this class must correspond to those of 
org.apache.druid.indexing.common.task.KillUnusedSegmentsTask, except
- * for "id" and "context" fields.
+ * Client representation of {@link 
org.apache.druid.indexing.common.task.KillUnusedSegmentsTask}. JSON 
searialization
+ * fields of this class must correspond to those of {@link 
org.apache.druid.indexing.common.task.KillUnusedSegmentsTask},
+ * except for {@code id} and {@code context} fields.
  */
 public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
 {
@@ -42,6 +43,7 @@ public class ClientKillUnusedSegmentsTaskQuery implements 
ClientTaskQuery
   private final Boolean markAsUnused;
   private final Integer batchSize;
   @Nullable private final Integer limit;
+  @Nullable private final DateTime maxUsedStatusLastUpdatedTime;
 
   @JsonCreator
   public ClientKillUnusedSegmentsTaskQuery(
@@ -50,16 +52,23 @@ public class ClientKillUnusedSegmentsTaskQuery implements 
ClientTaskQuery
       @JsonProperty("interval") Interval interval,
       @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
       @JsonProperty("batchSize") Integer batchSize,
-      @JsonProperty("limit") Integer limit
+      @JsonProperty("limit") @Nullable Integer limit,
+      @JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime 
maxUsedStatusLastUpdatedTime
   )
   {
-    this.id = Preconditions.checkNotNull(id, "id");
+    if (id == null) {
+      throw InvalidInput.exception("kill task id cannot be null");
+    }
+    if (limit != null && limit <= 0) {
+      throw InvalidInput.exception("limit[%d] must be a positive integer.", 
limit);
+    }
+    this.id = id;
     this.dataSource = dataSource;
     this.interval = interval;
     this.markAsUnused = markAsUnused;
     this.batchSize = batchSize;
-    Preconditions.checkArgument(limit == null || limit > 0, "limit must be > 
0");
     this.limit = limit;
+    this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
   }
 
   @JsonProperty
@@ -116,6 +125,13 @@ public class ClientKillUnusedSegmentsTaskQuery implements 
ClientTaskQuery
     return limit;
   }
 
+  @JsonProperty
+  @Nullable
+  public DateTime getMaxUsedStatusLastUpdatedTime()
+  {
+    return maxUsedStatusLastUpdatedTime;
+  }
+
 
   @Override
   public boolean equals(Object o)
@@ -132,12 +148,13 @@ public class ClientKillUnusedSegmentsTaskQuery implements 
ClientTaskQuery
            && Objects.equals(interval, that.interval)
            && Objects.equals(markAsUnused, that.markAsUnused)
            && Objects.equals(batchSize, that.batchSize)
-           && Objects.equals(limit, that.limit);
+           && Objects.equals(limit, that.limit)
+           && Objects.equals(maxUsedStatusLastUpdatedTime, 
that.maxUsedStatusLastUpdatedTime);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(id, dataSource, interval, markAsUnused, batchSize, 
limit);
+    return Objects.hash(id, dataSource, interval, markAsUnused, batchSize, 
limit, maxUsedStatusLastUpdatedTime);
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index dd0f7d8c98a..31c97533900 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -24,6 +24,7 @@ import org.apache.druid.metadata.ReplaceTaskLock;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.PartialShardSpec;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -126,14 +127,6 @@ public interface IndexerMetadataStorageCoordinator
       Segments visibility
   );
 
-  /**
-   * see {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer)}
-   */
-  default List<DataSegment> retrieveUnusedSegmentsForInterval(String 
dataSource, Interval interval)
-  {
-    return retrieveUnusedSegmentsForInterval(dataSource, interval, null);
-  }
-
   /**
    * Retrieve all published segments which include ONLY data within the given 
interval and are marked as unused from the
    * metadata store.
@@ -141,6 +134,10 @@ public interface IndexerMetadataStorageCoordinator
    * @param dataSource  The data source the segments belong to
    * @param interval    Filter the data segments to ones that include data in 
this interval exclusively.
    * @param limit The maximum number of unused segments to retreive. If null, 
no limit is applied.
+   * @param maxUsedStatusLastUpdatedTime The maximum {@code 
used_status_last_updated} time. Any unused segment in {@code interval}
+   *                                   with {@code used_status_last_updated} 
no later than this time will be included in the
+   *                                   kill task. Segments without {@code 
used_status_last_updated} time (due to an upgrade
+   *                                   from legacy Druid) will have {@code 
maxUsedStatusLastUpdatedTime} ignored
    *
    * @return DataSegments which include ONLY data within the requested 
interval and are marked as unused. Segments NOT
    * returned here may include data in the interval
@@ -148,7 +145,8 @@ public interface IndexerMetadataStorageCoordinator
   List<DataSegment> retrieveUnusedSegmentsForInterval(
       String dataSource,
       Interval interval,
-      @Nullable Integer limit
+      @Nullable Integer limit,
+      @Nullable DateTime maxUsedStatusLastUpdatedTime
   );
 
   /**
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 69dca46ea1c..0ef488aed40 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -229,30 +229,34 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     );
   }
 
-  @Override
-  public List<DataSegment> retrieveUnusedSegmentsForInterval(final String 
dataSource, final Interval interval)
-  {
-    return retrieveUnusedSegmentsForInterval(dataSource, interval, null);
-  }
-
   @Override
   public List<DataSegment> retrieveUnusedSegmentsForInterval(
       String dataSource,
       Interval interval,
-      @Nullable Integer limit
+      @Nullable Integer limit,
+      @Nullable DateTime maxUsedStatusLastUpdatedTime
   )
   {
     final List<DataSegment> matchingSegments = connector.inReadOnlyTransaction(
         (handle, status) -> {
           try (final CloseableIterator<DataSegment> iterator =
                    SqlSegmentsMetadataQuery.forHandle(handle, connector, 
dbTables, jsonMapper)
-                       .retrieveUnusedSegments(dataSource, 
Collections.singletonList(interval), limit, null, null)) {
+                                           .retrieveUnusedSegments(
+                                               dataSource,
+                                               
Collections.singletonList(interval),
+                                               limit,
+                                               null,
+                                               null,
+                                               maxUsedStatusLastUpdatedTime
+                                           )
+          ) {
             return ImmutableList.copyOf(iterator);
           }
         }
     );
 
-    log.info("Found %,d unused segments for %s for interval %s.", 
matchingSegments.size(), dataSource, interval);
+    log.info("Found [%,d] unused segments for datasource[%s] in interval[%s] 
with maxUsedStatusLastUpdatedTime[%s].",
+             matchingSegments.size(), dataSource, interval, 
maxUsedStatusLastUpdatedTime);
     return matchingSegments;
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java 
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
index eb8a36bc3a7..94c2fae60fd 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
@@ -163,17 +163,25 @@ public interface SegmentsMetadataManager
   Set<String> retrieveAllDataSourceNames();
 
   /**
-   * Returns top N unused segment intervals with the start time no earlier 
than the specified start time (if not null)
-   * and with the end time no later than the specified maxEndTime and with 
sed_status_last_updated time no later than
-   * maxLastUsedTime when ordered by segment start time, end time. Any segment 
having no used_status_last_updated time
-   * due to upgrade from legacy Druid means maxUsedFlagLastUpdatedTime is 
ignored for that segment.
+   * Returns a list of up to {@code limit} unused segment intervals for the 
specified datasource. Segments are filtered based on the following criteria:
+   *
+   * <li> The start time of the segment must be no earlier than the specified 
{@code minStartTime} (if not null). </li>
+   * <li> The end time of the segment must be no later than the specified 
{@code maxEndTime}. </li>
+   * <li> The {@code used_status_last_updated} time of the segment must be no 
later than {@code maxUsedStatusLastUpdatedTime}.
+   *      Segments that have no {@code used_status_last_updated} time (due to 
an upgrade from legacy Druid) will
+   *      have {@code maxUsedStatusLastUpdatedTime} ignored. </li>
+   *
+   * <p>
+   * The list of intervals is ordered by segment start time and then by end 
time.
+   * </p>
    */
   List<Interval> getUnusedSegmentIntervals(
       String dataSource,
       DateTime minStartTime,
       DateTime maxEndTime,
       int limit,
-      DateTime maxUsedFlagLastUpdatedTime);
+      DateTime maxUsedStatusLastUpdatedTime
+  );
 
   @VisibleForTesting
   void poll();
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
index 12d43ec5b76..0b423006152 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
@@ -687,7 +687,7 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
           }
 
           try (final CloseableIterator<DataSegment> iterator =
-                   queryTool.retrieveUnusedSegments(dataSourceName, intervals, 
null, null, null)) {
+                   queryTool.retrieveUnusedSegments(dataSourceName, intervals, 
null, null, null, null)) {
             while (iterator.hasNext()) {
               final DataSegment dataSegment = iterator.next();
               timeline.addSegments(Iterators.singletonIterator(dataSegment));
@@ -994,7 +994,7 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
                   ? Intervals.ONLY_ETERNITY
                   : Collections.singletonList(interval);
           try (final CloseableIterator<DataSegment> iterator =
-                   queryTool.retrieveUnusedSegments(datasource, intervals, 
limit, lastSegmentId, sortOrder)) {
+                   queryTool.retrieveUnusedSegments(datasource, intervals, 
limit, lastSegmentId, sortOrder, null)) {
             return ImmutableList.copyOf(iterator);
           }
         }
@@ -1138,7 +1138,7 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
       @Nullable final DateTime minStartTime,
       final DateTime maxEndTime,
       final int limit,
-      DateTime maxUsedFlagLastUpdatedTime
+      DateTime maxUsedStatusLastUpdatedTime
   )
   {
     // Note that we handle the case where used_status_last_updated IS NULL 
here to allow smooth transition to Druid version that uses 
used_status_last_updated column
@@ -1162,7 +1162,7 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
                 .setMaxRows(limit)
                 .bind("dataSource", dataSource)
                 .bind("end", maxEndTime.toString())
-                .bind("used_status_last_updated", 
maxUsedFlagLastUpdatedTime.toString())
+                .bind("used_status_last_updated", 
maxUsedStatusLastUpdatedTime.toString())
                 .map(
                     new BaseResultSetMapper<Interval>()
                     {
@@ -1182,7 +1182,6 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
 
             Iterator<Interval> iter = sql.iterator();
 
-
             List<Interval> result = Lists.newArrayListWithCapacity(limit);
             for (int i = 0; i < limit && iter.hasNext(); i++) {
               try {
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index 545acb84507..3bd7c48ad05 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.PreparedBatch;
@@ -117,7 +118,16 @@ public class SqlSegmentsMetadataQuery
       final Collection<Interval> intervals
   )
   {
-    return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, 
true, null, null, null);
+    return retrieveSegments(
+        dataSource,
+        intervals,
+        IntervalMode.OVERLAPS,
+        true,
+        null,
+        null,
+        null,
+        null
+    );
   }
 
   /**
@@ -135,6 +145,10 @@ public class SqlSegmentsMetadataQuery
    *                      lexigraphically if sortOrder is DESC.
    * @param sortOrder     Specifies the order with which to return the 
matching segments by start time, end time.
    *                      A null value indicates that order does not matter.
+   * @param maxUsedStatusLastUpdatedTime The maximum {@code 
used_status_last_updated} time. Any unused segment in {@code intervals}
+   *                                   with {@code used_status_last_updated} 
no later than this time will be included in the
+   *                                   iterator. Segments without {@code 
used_status_last_updated} time (due to an upgrade
+   *                                   from legacy Druid) will have {@code 
maxUsedStatusLastUpdatedTime} ignored
 
    * Returns a closeable iterator. You should close it when you are done.
    */
@@ -143,10 +157,20 @@ public class SqlSegmentsMetadataQuery
       final Collection<Interval> intervals,
       @Nullable final Integer limit,
       @Nullable final String lastSegmentId,
-      @Nullable final SortOrder sortOrder
+      @Nullable final SortOrder sortOrder,
+      @Nullable final DateTime maxUsedStatusLastUpdatedTime
   )
   {
-    return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, 
false, limit, lastSegmentId, sortOrder);
+    return retrieveSegments(
+        dataSource,
+        intervals,
+        IntervalMode.CONTAINS,
+        false,
+        limit,
+        lastSegmentId,
+        sortOrder,
+        maxUsedStatusLastUpdatedTime
+    );
   }
 
   /**
@@ -241,6 +265,7 @@ public class SqlSegmentsMetadataQuery
                   true,
                   null,
                   null,
+                  null,
                   null
               ),
               DataSegment::getId
@@ -379,12 +404,13 @@ public class SqlSegmentsMetadataQuery
       final boolean used,
       @Nullable final Integer limit,
       @Nullable final String lastSegmentId,
-      @Nullable final SortOrder sortOrder
+      @Nullable final SortOrder sortOrder,
+      @Nullable final DateTime maxUsedStatusLastUpdatedTime
   )
   {
     if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) {
       return CloseableIterators.withEmptyBaggage(
-          retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, 
used, limit, lastSegmentId, sortOrder)
+          retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, 
used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)
       );
     } else {
       final List<List<Interval>> intervalsLists = Lists.partition(new 
ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH);
@@ -399,7 +425,8 @@ public class SqlSegmentsMetadataQuery
             used,
             limitPerBatch,
             lastSegmentId,
-            sortOrder
+            sortOrder,
+            maxUsedStatusLastUpdatedTime
         );
         if (limitPerBatch != null) {
           // If limit is provided, we need to shrink the limit for subsequent 
batches or circuit break if
@@ -425,7 +452,8 @@ public class SqlSegmentsMetadataQuery
       final boolean used,
       @Nullable final Integer limit,
       @Nullable final String lastSegmentId,
-      @Nullable final SortOrder sortOrder
+      @Nullable final SortOrder sortOrder,
+      @Nullable final DateTime maxUsedStatusLastUpdatedTime
   )
   {
     // Check if the intervals all support comparing as strings. If so, bake 
them into the SQL.
@@ -438,6 +466,12 @@ public class SqlSegmentsMetadataQuery
       appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, 
connector);
     }
 
+    // Add the used_status_last_updated time filter only for unused segments 
when maxUsedStatusLastUpdatedTime is non-null.
+    final boolean addMaxUsedLastUpdatedTimeFilter = !used && 
maxUsedStatusLastUpdatedTime != null;
+    if (addMaxUsedLastUpdatedTimeFilter) {
+      sb.append(" AND (used_status_last_updated IS NOT NULL AND 
used_status_last_updated <= :used_status_last_updated)");
+    }
+
     if (lastSegmentId != null) {
       sb.append(
           StringUtils.format(
@@ -462,10 +496,16 @@ public class SqlSegmentsMetadataQuery
         .setFetchSize(connector.getStreamingFetchSize())
         .bind("used", used)
         .bind("dataSource", dataSource);
+
+    if (addMaxUsedLastUpdatedTimeFilter) {
+      sql.bind("used_status_last_updated", 
maxUsedStatusLastUpdatedTime.toString());
+    }
+
     if (lastSegmentId != null) {
       sql.bind("id", lastSegmentId);
     }
-    if (null != limit) {
+
+    if (limit != null) {
       sql.setMaxRows(limit);
     }
 
diff --git 
a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java 
b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
index 6a8e515b327..422803492d8 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
@@ -34,6 +34,7 @@ import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.rpc.ServiceRetryPolicy;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -73,33 +74,20 @@ public interface OverlordClient
 
   /**
    * Run a "kill" task for a particular datasource and interval. Shortcut to 
{@link #runTask(String, Object)}.
-   *
-   * The kill task deletes all unused segment records from deep storage and 
the metadata store. The task runs
-   * asynchronously after the API call returns. The resolved future is the ID 
of the task, which can be used to
-   * monitor its progress through the {@link #taskStatus(String)} API.
-   *
-   * @param idPrefix   Descriptive prefix to include at the start of task IDs
-   * @param dataSource Datasource to kill
-   * @param interval   Interval to kill
-   *
-   * @return future with task ID
-   */
-  default ListenableFuture<String> runKillTask(String idPrefix, String 
dataSource, Interval interval)
-  {
-    return runKillTask(idPrefix, dataSource, interval, null);
-  }
-
-  /**
-   * Run a "kill" task for a particular datasource and interval. Shortcut to 
{@link #runTask(String, Object)}.
-   *
    * The kill task deletes all unused segment records from deep storage and 
the metadata store. The task runs
    * asynchronously after the API call returns. The resolved future is the ID 
of the task, which can be used to
    * monitor its progress through the {@link #taskStatus(String)} API.
    *
    * @param idPrefix   Descriptive prefix to include at the start of task IDs
    * @param dataSource Datasource to kill
-   * @param interval   Interval to kill
+   * @param interval   Umbrella interval to be considered by the kill task. 
Note that unused segments falling in this
+   *                   widened umbrella interval may have different {@code 
used_status_last_updated} time, so the kill task
+   *                   should also filter by {@code 
maxUsedStatusLastUpdatedTime}
    * @param maxSegmentsToKill  The maximum number of segments to kill
+   * @param maxUsedStatusLastUpdatedTime The maximum {@code 
used_status_last_updated} time. Any unused segment in {@code interval}
+   *                                   with {@code used_status_last_updated} 
no later than this time will be included in the
+   *                                   kill task. Segments without {@code 
used_status_last_updated} time (due to an upgrade
+   *                                   from legacy Druid) will have {@code 
maxUsedStatusLastUpdatedTime} ignored
    *
    * @return future with task ID
    */
@@ -107,7 +95,8 @@ public interface OverlordClient
       String idPrefix,
       String dataSource,
       Interval interval,
-      @Nullable Integer maxSegmentsToKill
+      @Nullable Integer maxSegmentsToKill,
+      @Nullable DateTime maxUsedStatusLastUpdatedTime
   )
   {
     final String taskId = IdUtils.newTaskId(idPrefix, 
ClientKillUnusedSegmentsTaskQuery.TYPE, dataSource, interval);
@@ -117,7 +106,8 @@ public interface OverlordClient
         interval,
         false,
         null,
-        maxSegmentsToKill
+        maxSegmentsToKill,
+        maxUsedStatusLastUpdatedTime
     );
     return FutureUtils.transform(runTask(taskId, taskQuery), ignored -> 
taskId);
   }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index da7847f4505..fb166362345 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -20,10 +20,10 @@
 package org.apache.druid.server.coordinator.duty;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.JodaUtils;
@@ -37,6 +37,7 @@ import 
org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.DateTime;
+import org.joda.time.Duration;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -46,13 +47,19 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
+ * <p>
  * Completely removes information about unused segments who have an interval 
end that comes before
- * now - {@link #retainDuration} from the metadata store. retainDuration can 
be a positive or negative duration,
- * negative meaning the interval end target will be in the future. Also, 
retainDuration can be ignored,
- * meaning that there is no upper bound to the end interval of segments that 
will be killed. This action is called
- * "to kill a segment".
+ * now - {@link #durationToRetain} from the metadata store. {@link 
#durationToRetain} can be a positive or negative duration,
+ * negative meaning the interval end target will be in the future. Also, 
{@link #durationToRetain} can be ignored if
+ * {@link #ignoreDurationToRetain} is enabled, meaning that there is no upper 
bound to the end interval of segments that
+ * will be killed. The umbrella interval of the unused segments per datasource 
to be killed is determined by
+ * {@link #findIntervalForKill(String, DateTime)}, which takes into account 
the configured {@link #bufferPeriod}. However,
+ * the kill task needs to check again for max {@link #bufferPeriod} for the 
unused segments in the widened interval
+ * as there can be multiple unused segments with different {@code 
used_status_last_updated} time.
+ * </p>
  * <p>
- * See org.apache.druid.indexing.common.task.KillUnusedSegmentsTask.
+ * See {@link org.apache.druid.indexing.common.task.KillUnusedSegmentsTask}.
+ * </p>
  */
 public class KillUnusedSegments implements CoordinatorDuty
 {
@@ -63,9 +70,9 @@ public class KillUnusedSegments implements CoordinatorDuty
                 && (KILL_TASK_TYPE.equals(status.getType()) && 
status.getId().startsWith(TASK_ID_PREFIX));
   private static final Logger log = new Logger(KillUnusedSegments.class);
 
-  private final long period;
-  private final long retainDuration;
-  private final boolean ignoreRetainDuration;
+  private final Duration period;
+  private final Duration durationToRetain;
+  private final boolean ignoreDurationToRetain;
   private final int maxSegmentsToKill;
 
   /**
@@ -73,7 +80,7 @@ public class KillUnusedSegments implements CoordinatorDuty
    * datasource.
    */
   private final Map<String, DateTime> datasourceToLastKillIntervalEnd;
-  private long lastKillTime = 0;
+  private DateTime lastKillTime;
   private final long bufferPeriod;
 
   private final SegmentsMetadataManager segmentsMetadataManager;
@@ -85,32 +92,37 @@ public class KillUnusedSegments implements CoordinatorDuty
       DruidCoordinatorConfig config
   )
   {
-    this.period = config.getCoordinatorKillPeriod().getMillis();
-    Preconditions.checkArgument(
-        this.period >= config.getCoordinatorIndexingPeriod().getMillis(),
-        "coordinator kill period must be greater than or equal to 
druid.coordinator.period.indexingPeriod"
-    );
-
-    this.ignoreRetainDuration = 
config.getCoordinatorKillIgnoreDurationToRetain();
-    this.retainDuration = 
config.getCoordinatorKillDurationToRetain().getMillis();
-    if (this.ignoreRetainDuration) {
+    if (config.getCoordinatorKillPeriod().getMillis() < 
config.getCoordinatorIndexingPeriod().getMillis()) {
+      throw InvalidInput.exception(
+          "druid.coordinator.kill.period[%s] must be >= 
druid.coordinator.period.indexingPeriod[%s]",
+          config.getCoordinatorKillPeriod(),
+          config.getCoordinatorIndexingPeriod()
+      );
+    }
+    if (config.getCoordinatorKillMaxSegments() < 0) {
+      throw InvalidInput.exception(
+          "druid.coordinator.kill.maxSegments[%s] is invalid. It must be a 
positive integer.",
+          config.getCoordinatorKillMaxSegments()
+      );
+    }
+    this.period = config.getCoordinatorKillPeriod();
+    this.ignoreDurationToRetain = 
config.getCoordinatorKillIgnoreDurationToRetain();
+    this.durationToRetain = config.getCoordinatorKillDurationToRetain();
+    if (this.ignoreDurationToRetain) {
       log.debug(
-          "druid.coordinator.kill.durationToRetain [%s] will be ignored when 
discovering segments to kill "
-          + "because you have set 
druid.coordinator.kill.ignoreDurationToRetain to True.",
-          this.retainDuration
+          "druid.coordinator.kill.durationToRetain[%s] will be ignored when 
discovering segments to kill "
+          + "because druid.coordinator.kill.ignoreDurationToRetain is set to 
true.",
+          this.durationToRetain
       );
     }
     this.bufferPeriod = config.getCoordinatorKillBufferPeriod().getMillis();
-
     this.maxSegmentsToKill = config.getCoordinatorKillMaxSegments();
-    Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill 
maxSegments must be > 0");
-
     datasourceToLastKillIntervalEnd = new ConcurrentHashMap<>();
 
     log.info(
-        "Kill Task scheduling enabled with period [%s], retainDuration [%s], 
bufferPeriod [%s], maxSegmentsToKill [%s]",
+        "Kill task scheduling enabled with period[%s], durationToRetain[%s], 
bufferPeriod[%s], maxSegmentsToKill[%s]",
         this.period,
-        this.ignoreRetainDuration ? "IGNORING" : this.retainDuration,
+        this.ignoreDurationToRetain ? "IGNORING" : this.durationToRetain,
         this.bufferPeriod,
         this.maxSegmentsToKill
     );
@@ -122,9 +134,12 @@ public class KillUnusedSegments implements CoordinatorDuty
   @Override
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
   {
-    final long currentTimeMillis = System.currentTimeMillis();
-    if (lastKillTime + period > currentTimeMillis) {
-      log.debug("Skipping kill of unused segments as kill period has not 
elapsed yet.");
+    if (!canDutyRun()) {
+      log.debug(
+          "Skipping KillUnusedSegments until period[%s] has elapsed after 
lastKillTime[%s].",
+          period,
+          lastKillTime
+      );
       return params;
     }
 
@@ -159,10 +174,9 @@ public class KillUnusedSegments implements CoordinatorDuty
         dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
       }
 
-      log.debug("Killing unused segments in datasources: %s", 
dataSourcesToKill);
-      lastKillTime = System.currentTimeMillis();
+      log.debug("Killing unused segments for datasources[%s]", 
dataSourcesToKill);
+      lastKillTime = DateTimes.nowUtc();
       taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, 
availableKillTaskSlots);
-
     }
 
     // any datasources that are no longer being considered for kill should 
have their
@@ -196,26 +210,31 @@ public class KillUnusedSegments implements CoordinatorDuty
               + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
           break;
         }
-        final Interval intervalToKill = findIntervalForKill(dataSource);
+        final DateTime maxUsedStatusLastUpdatedTime = 
DateTimes.nowUtc().minus(bufferPeriod);
+        final Interval intervalToKill = findIntervalForKill(dataSource, 
maxUsedStatusLastUpdatedTime);
         if (intervalToKill == null) {
           datasourceToLastKillIntervalEnd.remove(dataSource);
           continue;
         }
 
         try {
-          FutureUtils.getUnchecked(overlordClient.runKillTask(
-              TASK_ID_PREFIX,
-              dataSource,
-              intervalToKill,
-              maxSegmentsToKill
-          ), true);
+          FutureUtils.getUnchecked(
+              overlordClient.runKillTask(
+                  TASK_ID_PREFIX,
+                  dataSource,
+                  intervalToKill,
+                  maxSegmentsToKill,
+                  maxUsedStatusLastUpdatedTime
+              ),
+              true
+          );
           ++submittedTasks;
           datasourceToLastKillIntervalEnd.put(dataSource, 
intervalToKill.getEnd());
         }
         catch (Exception ex) {
-          log.error(ex, "Failed to submit kill task for dataSource [%s]", 
dataSource);
+          log.error(ex, "Failed to submit kill task for dataSource[%s] in 
interval[%s]", dataSource, intervalToKill);
           if (Thread.currentThread().isInterrupted()) {
-            log.warn("skipping kill task scheduling because thread is 
interrupted.");
+            log.warn("Skipping kill task scheduling because thread is 
interrupted.");
             break;
           }
         }
@@ -244,14 +263,13 @@ public class KillUnusedSegments implements CoordinatorDuty
    * Calculates the interval for which segments are to be killed in a 
datasource.
    */
   @Nullable
-  private Interval findIntervalForKill(String dataSource)
+  private Interval findIntervalForKill(String dataSource, DateTime 
maxUsedStatusLastUpdatedTime)
   {
-    final DateTime maxEndTime = ignoreRetainDuration
+    final DateTime maxEndTime = ignoreDurationToRetain
                                 ? DateTimes.COMPARE_DATE_AS_STRING_MAX
-                                : DateTimes.nowUtc().minus(retainDuration);
-
+                                : DateTimes.nowUtc().minus(durationToRetain);
     List<Interval> unusedSegmentIntervals = segmentsMetadataManager
-        .getUnusedSegmentIntervals(dataSource, 
datasourceToLastKillIntervalEnd.get(dataSource), maxEndTime, maxSegmentsToKill, 
DateTimes.nowUtc().minus(bufferPeriod));
+        .getUnusedSegmentIntervals(dataSource, 
datasourceToLastKillIntervalEnd.get(dataSource), maxEndTime, maxSegmentsToKill, 
maxUsedStatusLastUpdatedTime);
 
     if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) {
       return null;
@@ -270,6 +288,11 @@ public class KillUnusedSegments implements CoordinatorDuty
     );
   }
 
+  private boolean canDutyRun()
+  {
+    return lastKillTime == null || 
!DateTimes.nowUtc().isBefore(lastKillTime.plus(period));
+  }
+
   @VisibleForTesting
   static int getKillTaskCapacity(int totalWorkerCapacity, double 
killTaskSlotRatio, int maxKillTaskSlots)
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java 
b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index a539a48ecb7..1546544029e 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -380,7 +380,7 @@ public class DataSourcesResource
     final Interval theInterval = Intervals.of(interval.replace('_', '/'));
     try {
       final String killTaskId = FutureUtils.getUnchecked(
-          overlordClient.runKillTask("api-issued", dataSourceName, 
theInterval, null),
+          overlordClient.runKillTask("api-issued", dataSourceName, 
theInterval, null, null),
           true
       );
       auditManager.doAudit(
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
 
b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
index 60edff93077..0059d048e06 100644
--- 
a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
@@ -48,7 +48,8 @@ public class ClientKillUnusedSegmentsTaskQueryTest
         INTERVAL,
         true,
         BATCH_SIZE,
-        LIMIT
+        LIMIT,
+        null
     );
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 4c3534feacd..0626792f1fd 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -372,10 +372,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
 
   private void markAllSegmentsUnused()
   {
-    markAllSegmentsUnused(SEGMENTS);
+    markAllSegmentsUnused(SEGMENTS, DateTimes.nowUtc());
   }
 
-  private void markAllSegmentsUnused(Set<DataSegment> segments)
+  private void markAllSegmentsUnused(Set<DataSegment> segments, DateTime 
usedStatusLastUpdatedTime)
   {
     for (final DataSegment segment : segments) {
       Assert.assertEquals(
@@ -386,7 +386,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
                     "UPDATE %s SET used = false, used_status_last_updated = 
:used_status_last_updated WHERE id = :id",
                     
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable()
                 );
-                return handle.createStatement(request).bind("id", 
segment.getId().toString()).bind("used_status_last_updated", 
DateTimes.nowUtc().toString()).execute();
+                return handle.createStatement(request)
+                             .bind("id", segment.getId().toString())
+                             .bind("used_status_last_updated", 
usedStatusLastUpdatedTime.toString()
+                ).execute();
               }
           )
       );
@@ -977,7 +980,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   public void testRetrieveSegmentForId()
   {
     insertUsedSegments(ImmutableSet.of(defaultSegment));
-    markAllSegmentsUnused(ImmutableSet.of(defaultSegment));
+    markAllSegmentsUnused(ImmutableSet.of(defaultSegment), DateTimes.nowUtc());
     Assert.assertEquals(defaultSegment, 
coordinator.retrieveSegmentForId(defaultSegment.getId().toString(), true));
   }
 
@@ -1147,11 +1150,12 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   public void testRetrieveUnusedSegmentsUsingSingleIntervalAndNoLimit() throws 
IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments));
+    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
 
     final List<DataSegment> actualUnusedSegments = 
coordinator.retrieveUnusedSegmentsForInterval(
         DS.WIKI,
         Intervals.of("1900/3000"),
+        null,
         null
     );
 
@@ -1163,13 +1167,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitAtRange() 
throws IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments));
+    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
 
     final int requestedLimit = segments.size();
     final List<DataSegment> actualUnusedSegments = 
coordinator.retrieveUnusedSegmentsForInterval(
         DS.WIKI,
         Intervals.of("1900/3000"),
-        requestedLimit
+        requestedLimit,
+        null
     );
 
     Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
@@ -1180,13 +1185,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitInRange() 
throws IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments));
+    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
 
     final int requestedLimit = segments.size() - 1;
     final List<DataSegment> actualUnusedSegments = 
coordinator.retrieveUnusedSegmentsForInterval(
         DS.WIKI,
         Intervals.of("1900/3000"),
-        requestedLimit
+        requestedLimit,
+        null
     );
 
     Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
@@ -1197,13 +1203,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   public void 
testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitOutOfRange() throws 
IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments));
+    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
 
     final int limit = segments.size() + 1;
     final List<DataSegment> actualUnusedSegments = 
coordinator.retrieveUnusedSegmentsForInterval(
         DS.WIKI,
         Intervals.of("1900/3000"),
-        limit
+        limit,
+        null
     );
     Assert.assertEquals(segments.size(), actualUnusedSegments.size());
     Assert.assertTrue(actualUnusedSegments.containsAll(segments));
@@ -1213,7 +1220,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   public void testRetrieveUnusedSegmentsUsingSingleIntervalOutOfRange() throws 
IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1905, 
1910);
-    markAllSegmentsUnused(new HashSet<>(segments));
+    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
 
     final Interval outOfRangeInterval = Intervals.of("1700/1800");
     Assert.assertTrue(segments.stream()
@@ -1223,7 +1230,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     final List<DataSegment> actualUnusedSegments = 
coordinator.retrieveUnusedSegmentsForInterval(
         DS.WIKI,
         outOfRangeInterval,
-        limit
+        limit,
+        null
     );
     Assert.assertEquals(0, actualUnusedSegments.size());
   }
@@ -1232,12 +1240,13 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndNoLimit() 
throws IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments));
+    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
 
     final ImmutableList<DataSegment> actualUnusedSegments = 
retrieveUnusedSegments(
         
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
         null,
         null,
+        null,
         null
     );
     Assert.assertEquals(segments.size(), actualUnusedSegments.size());
@@ -1248,12 +1257,13 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   public void 
testRetrieveUnusedSegmentsUsingNoIntervalsNoLimitAndNoLastSegmentId() throws 
IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments));
+    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
 
     final ImmutableList<DataSegment> actualUnusedSegments = 
retrieveUnusedSegments(
         ImmutableList.of(),
         null,
         null,
+        null,
         null
     );
     Assert.assertEquals(segments.size(), actualUnusedSegments.size());
@@ -1264,7 +1274,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   public void 
testRetrieveUnusedSegmentsUsingNoIntervalsAndNoLimitAndNoLastSegmentId() throws 
IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(2033, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments));
+    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
 
     String lastSegmentId = segments.get(9).getId().toString();
     final List<DataSegment> expectedSegmentsAscOrder = segments.stream()
@@ -1274,6 +1284,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         ImmutableList.of(),
         null,
         lastSegmentId,
+        null,
         null
     );
     Assert.assertEquals(expectedSegmentsAscOrder.size(), 
actualUnusedSegments.size());
@@ -1283,7 +1294,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         ImmutableList.of(),
         null,
         lastSegmentId,
-        SortOrder.ASC
+        SortOrder.ASC,
+        null
     );
     Assert.assertEquals(expectedSegmentsAscOrder.size(), 
actualUnusedSegments.size());
     Assert.assertEquals(expectedSegmentsAscOrder, actualUnusedSegments);
@@ -1297,7 +1309,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         ImmutableList.of(),
         null,
         lastSegmentId,
-        SortOrder.DESC
+        SortOrder.DESC,
+        null
     );
     Assert.assertEquals(expectedSegmentsDescOrder.size(), 
actualUnusedSegments.size());
     Assert.assertEquals(expectedSegmentsDescOrder, actualUnusedSegments);
@@ -1307,12 +1320,13 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   public void 
testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitAtRange() throws 
IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments));
+    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
 
     final ImmutableList<DataSegment> actualUnusedSegments = 
retrieveUnusedSegments(
         
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
         segments.size(),
         null,
+        null,
         null
     );
     Assert.assertEquals(segments.size(), actualUnusedSegments.size());
@@ -1323,13 +1337,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   public void 
testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitInRange() throws 
IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments));
+    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
 
     final int requestedLimit = segments.size() - 1;
     final ImmutableList<DataSegment> actualUnusedSegments = 
retrieveUnusedSegments(
         
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
         requestedLimit,
         null,
+        null,
         null
     );
     Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
@@ -1340,7 +1355,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   public void 
testRetrieveUnusedSegmentsUsingMultipleIntervalsInSingleBatchLimitAndLastSegmentId()
 throws IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(2034, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments));
+    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
 
     final int requestedLimit = segments.size();
     final String lastSegmentId = segments.get(4).getId().toString();
@@ -1348,6 +1363,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
         requestedLimit,
         lastSegmentId,
+        null,
         null
     );
     Assert.assertEquals(segments.size() - 5, actualUnusedSegments.size());
@@ -1361,7 +1377,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   public void 
testRetrieveUnusedSegmentsUsingMultipleIntervalsLimitAndLastSegmentId() throws 
IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments));
+    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
 
     final int requestedLimit = segments.size() - 1;
     final String lastSegmentId = segments.get(4).getId().toString();
@@ -1369,6 +1385,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
         requestedLimit,
         lastSegmentId,
+        null,
         null
     );
     Assert.assertEquals(requestedLimit - 4, actualUnusedSegments.size());
@@ -1382,12 +1399,13 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   public void testRetrieveUnusedSegmentsUsingMultipleIntervals() throws 
IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
2133);
-    markAllSegmentsUnused(new HashSet<>(segments));
+    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
 
     final ImmutableList<DataSegment> actualUnusedSegments = 
retrieveUnusedSegments(
         
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
         segments.size() + 1,
         null,
+        null,
         null
     );
     Assert.assertEquals(segments.size(), actualUnusedSegments.size());
@@ -1398,7 +1416,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   public void testRetrieveUnusedSegmentsUsingIntervalOutOfRange() throws 
IOException
   {
     final List<DataSegment> segments = createAndGetUsedYearSegments(1905, 
1910);
-    markAllSegmentsUnused(new HashSet<>(segments));
+    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
 
     final Interval outOfRangeInterval = Intervals.of("1700/1800");
     Assert.assertTrue(segments.stream()
@@ -1408,11 +1426,82 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         ImmutableList.of(outOfRangeInterval),
         null,
         null,
-        null
+        null,
+         null
     );
     Assert.assertEquals(0, actualUnusedSegments.size());
   }
 
+  @Test
+  public void testRetrieveUnusedSegmentsWithMaxUsedStatusLastUpdatedTime() 
throws IOException
+  {
+    final List<DataSegment> segments = createAndGetUsedYearSegments(1905, 
1910);
+    markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+
+    final Interval interval = Intervals.of("1905/1920");
+
+    final ImmutableList<DataSegment> actualUnusedSegments1 = 
retrieveUnusedSegments(
+        ImmutableList.of(interval),
+        null,
+        null,
+        null,
+        DateTimes.nowUtc()
+    );
+    Assert.assertEquals(5, actualUnusedSegments1.size());
+
+    final ImmutableList<DataSegment> actualUnusedSegments2 = 
retrieveUnusedSegments(
+        ImmutableList.of(interval),
+        null,
+        null,
+        null,
+        DateTimes.nowUtc().minusHours(1)
+    );
+    Assert.assertEquals(0, actualUnusedSegments2.size());
+  }
+
+  @Test
+  public void testRetrieveUnusedSegmentsWithMaxUsedStatusLastUpdatedTime2() 
throws IOException
+  {
+    final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 
1950);
+    final List<DataSegment> evenYearSegments = new ArrayList<>();
+    final List<DataSegment> oddYearSegments = new ArrayList<>();
+
+    for (int i = 0; i < segments.size(); i++) {
+      DataSegment dataSegment = segments.get(i);
+      if (i % 2 == 0) {
+        evenYearSegments.add(dataSegment);
+      } else {
+        oddYearSegments.add(dataSegment);
+      }
+    }
+
+    final DateTime maxUsedStatusLastUpdatedTime1 = DateTimes.nowUtc();
+    markAllSegmentsUnused(new HashSet<>(oddYearSegments), 
maxUsedStatusLastUpdatedTime1);
+
+    final DateTime maxUsedStatusLastUpdatedTime2 = DateTimes.nowUtc();
+    markAllSegmentsUnused(new HashSet<>(evenYearSegments), 
maxUsedStatusLastUpdatedTime2);
+
+    final Interval interval = Intervals.of("1900/1950");
+
+    final ImmutableList<DataSegment> actualUnusedSegments1 = 
retrieveUnusedSegments(
+        ImmutableList.of(interval),
+        null,
+        null,
+        null,
+        maxUsedStatusLastUpdatedTime1
+    );
+    Assert.assertEquals(oddYearSegments.size(), actualUnusedSegments1.size());
+
+    final ImmutableList<DataSegment> actualUnusedSegments2 = 
retrieveUnusedSegments(
+        ImmutableList.of(interval),
+        null,
+        null,
+        null,
+        maxUsedStatusLastUpdatedTime2
+    );
+    Assert.assertEquals(segments.size(), actualUnusedSegments2.size());
+  }
+
   @Test
   public void testSimpleUnusedList() throws IOException
   {
@@ -1423,7 +1512,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         ImmutableSet.copyOf(
             coordinator.retrieveUnusedSegmentsForInterval(
                 defaultSegment.getDataSource(),
-                defaultSegment.getInterval()
+                defaultSegment.getInterval(),
+                null,
+                null
             )
         )
     );
@@ -1439,7 +1530,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         coordinator.retrieveUnusedSegmentsForInterval(
             defaultSegment.getDataSource(),
             defaultSegment.getInterval(),
-            limit
+            limit,
+            null
         )
     );
     Assert.assertEquals(limit, retreivedUnusedSegments.size());
@@ -1551,7 +1643,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
             new Interval(
                 defaultSegment.getInterval().getStart().minus(1),
                 defaultSegment.getInterval().getStart().plus(1)
-            )
+            ),
+            null,
+            null
         ).isEmpty()
     );
   }
@@ -1564,7 +1658,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     Assert.assertTrue(
         coordinator.retrieveUnusedSegmentsForInterval(
             defaultSegment.getDataSource(),
-            new Interval(defaultSegment.getInterval().getStart().plus(1), 
defaultSegment.getInterval().getEnd())
+            new Interval(defaultSegment.getInterval().getStart().plus(1), 
defaultSegment.getInterval().getEnd()),
+            null,
+            null
         ).isEmpty()
     );
   }
@@ -1578,7 +1674,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     Assert.assertTrue(
         coordinator.retrieveUnusedSegmentsForInterval(
             defaultSegment.getDataSource(),
-            new Interval(defaultSegment.getInterval().getStart(), 
defaultSegment.getInterval().getEnd().minus(1))
+            new Interval(defaultSegment.getInterval().getStart(), 
defaultSegment.getInterval().getEnd().minus(1)),
+            null,
+            null
         ).isEmpty()
     );
   }
@@ -1591,7 +1689,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     Assert.assertTrue(
         coordinator.retrieveUnusedSegmentsForInterval(
             defaultSegment.getDataSource(),
-            
defaultSegment.getInterval().withStart(defaultSegment.getInterval().getEnd().minus(1))
+            
defaultSegment.getInterval().withStart(defaultSegment.getInterval().getEnd().minus(1)),
+            null,
+            null
         ).isEmpty()
     );
   }
@@ -1606,7 +1706,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         ImmutableSet.copyOf(
             coordinator.retrieveUnusedSegmentsForInterval(
                 defaultSegment.getDataSource(),
-                Intervals.of("2000/2999")
+                Intervals.of("2000/2999"),
+                null,
+                null
             )
         )
     );
@@ -1622,7 +1724,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         ImmutableSet.copyOf(
             coordinator.retrieveUnusedSegmentsForInterval(
                 defaultSegment.getDataSource(),
-                
defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minus(1))
+                
defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minus(1)),
+                null,
+                null
             )
         )
     );
@@ -1631,7 +1735,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         ImmutableSet.copyOf(
             coordinator.retrieveUnusedSegmentsForInterval(
                 defaultSegment.getDataSource(),
-                
defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minusYears(1))
+                
defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minusYears(1)),
+                null,
+                null
             )
         )
     );
@@ -1647,7 +1753,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         ImmutableSet.copyOf(
             coordinator.retrieveUnusedSegmentsForInterval(
                 defaultSegment.getDataSource(),
-                
defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plus(1))
+                
defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plus(1)),
+                null,
+                null
             )
         )
     );
@@ -1656,7 +1764,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         ImmutableSet.copyOf(
             coordinator.retrieveUnusedSegmentsForInterval(
                 defaultSegment.getDataSource(),
-                
defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusYears(1))
+                
defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusYears(1)),
+                null,
+                null
             )
         )
     );
@@ -2189,7 +2299,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     Assert.assertEquals(1, identifier3.getShardSpec().getNumCorePartitions());
 
     // now drop the used segment previously loaded:
-    markAllSegmentsUnused(ImmutableSet.of(segment));
+    markAllSegmentsUnused(ImmutableSet.of(segment), DateTimes.nowUtc());
 
     // and final load, this reproduces an issue that could happen with 
multiple streaming appends,
     // followed by a reindex, followed by a drop, and more streaming data 
coming in for same interval
@@ -2208,7 +2318,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   }
 
   /**
-   * Slightly different that the above test but that involves reverted 
compaction
+   * Slightly different from the above test that involves reverted compaction
    * 1) used segments of version = A, id = 0, 1, 2
    * 2) overwrote segments of version = B, id = 0 <= compaction
    * 3) marked segments unused for version = A, id = 0, 1, 2 <= overshadowing
@@ -2354,7 +2464,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
 
     // 5) reverted compaction (by marking B_0 as unused)
     // Revert compaction a manual metadata update which is basically the 
following two steps:
-    markAllSegmentsUnused(ImmutableSet.of(compactedSegment)); // <- drop 
compacted segment
+    markAllSegmentsUnused(ImmutableSet.of(compactedSegment), 
DateTimes.nowUtc()); // <- drop compacted segment
     //        pending: version = A, id = 0,1,2
     //                 version = B, id = 1
     //
@@ -2896,7 +3006,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         ImmutableSet.copyOf(
             coordinator.retrieveUnusedSegmentsForInterval(
                 existingSegment1.getDataSource(),
-                
existingSegment1.getInterval().withEnd(existingSegment1.getInterval().getEnd().plus(1))
+                
existingSegment1.getInterval().withEnd(existingSegment1.getInterval().getEnd().plus(1)),
+                null,
+                null
             )
         )
     );
@@ -2905,7 +3017,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         ImmutableSet.copyOf(
             coordinator.retrieveUnusedSegmentsForInterval(
                 existingSegment2.getDataSource(),
-                
existingSegment2.getInterval().withEnd(existingSegment2.getInterval().getEnd().plusYears(1))
+                
existingSegment2.getInterval().withEnd(existingSegment2.getInterval().getEnd().plusYears(1)),
+                null,
+                null
             )
         )
     );
@@ -2928,7 +3042,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         ImmutableSet.copyOf(
             coordinator.retrieveUnusedSegmentsForInterval(
                 existingSegment1.getDataSource(),
-                
existingSegment1.getInterval().withEnd(existingSegment1.getInterval().getEnd().plus(1))
+                
existingSegment1.getInterval().withEnd(existingSegment1.getInterval().getEnd().plus(1)),
+                null,
+                null
             )
         )
     );
@@ -2937,7 +3053,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         ImmutableSet.copyOf(
             coordinator.retrieveUnusedSegmentsForInterval(
                 existingSegment2.getDataSource(),
-                
existingSegment2.getInterval().withEnd(existingSegment2.getInterval().getEnd().plusYears(1))
+                
existingSegment2.getInterval().withEnd(existingSegment2.getInterval().getEnd().plusYears(1)),
+                null,
+                null
             )
         )
     );
@@ -3100,7 +3218,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     
Assert.assertTrue(coordinator.commitSegments(dataSegments).containsAll(dataSegments));
 
     // Mark the tombstone as unused
-    markAllSegmentsUnused(tombstones);
+    markAllSegmentsUnused(tombstones, DateTimes.nowUtc());
 
     final Collection<DataSegment> allUsedSegments = 
coordinator.retrieveAllUsedSegments(
         DS.WIKI,
@@ -3154,7 +3272,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     
Assert.assertTrue(coordinator.commitSegments(dataSegments).containsAll(dataSegments));
 
     // Mark the tombstone as unused
-    markAllSegmentsUnused(tombstones);
+    markAllSegmentsUnused(tombstones, DateTimes.nowUtc());
 
     final Collection<DataSegment> allUsedSegments = 
coordinator.retrieveAllUsedSegments(
         DS.WIKI,
@@ -3205,7 +3323,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
       final List<Interval> intervals,
       final Integer limit,
       final String lastSegmentId,
-      final SortOrder sortOrder
+      final SortOrder sortOrder,
+      final DateTime maxUsedStatusLastUpdatedTime
   )
   {
     return derbyConnector.inReadOnlyTransaction(
@@ -3217,7 +3336,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
                            
derbyConnectorRule.metadataTablesConfigSupplier().get(),
                            mapper
                        )
-                       .retrieveUnusedSegments(DS.WIKI, intervals, limit, 
lastSegmentId, sortOrder)) {
+                       .retrieveUnusedSegments(DS.WIKI, intervals, limit, 
lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)) {
             return ImmutableList.copyOf(iterator);
           }
         }
diff --git 
a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
 
b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
index 7ba5916a771..cad3f21f06a 100644
--- 
a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
+++ 
b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
@@ -433,6 +433,7 @@ public class OverlordClientImplTest
         null,
         null,
         null,
+        null,
         null
     );
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index 5adb345c9fa..a174b9e2264 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -60,6 +60,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 
 /**
@@ -68,10 +69,10 @@ import static org.mockito.ArgumentMatchers.anyString;
 @RunWith(MockitoJUnitRunner.class)
 public class KillUnusedSegmentsTest
 {
-  private static final int MAX_SEGMENTS_TO_KILL = 10;
-  private static final Duration COORDINATOR_KILL_PERIOD = 
Duration.standardMinutes(2);
+  private static final Duration INDEXING_PERIOD = Duration.standardSeconds(1);
+  private static final Duration COORDINATOR_KILL_PERIOD = 
Duration.standardSeconds(1);
   private static final Duration DURATION_TO_RETAIN = Duration.standardDays(1);
-  private static final Duration INDEXING_PERIOD = Duration.standardMinutes(1);
+  private static final int MAX_SEGMENTS_TO_KILL = 10;
   private static final String DATASOURCE = "DS1";
 
   @Mock
@@ -174,7 +175,7 @@ public class KillUnusedSegmentsTest
     mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
     target.run(params);
     Mockito.verify(overlordClient, Mockito.never())
-           .runKillTask(anyString(), anyString(), any(Interval.class));
+           .runKillTask(anyString(), anyString(), any(Interval.class), 
anyInt(), any(DateTime.class));
   }
 
   @Test
@@ -192,7 +193,7 @@ public class KillUnusedSegmentsTest
     mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
     target.run(params);
     Mockito.verify(overlordClient, Mockito.never())
-           .runKillTask(anyString(), anyString(), any(Interval.class));
+           .runKillTask(anyString(), anyString(), any(Interval.class), 
anyInt(), any(DateTime.class));
   }
 
   @Test
@@ -364,48 +365,24 @@ public class KillUnusedSegmentsTest
   {
     int limit = config.getCoordinatorKillMaxSegments();
     Mockito.doReturn(Futures.immediateFuture("ok"))
-        .when(overlordClient)
-        .runKillTask(
-            ArgumentMatchers.anyString(),
-            ArgumentMatchers.anyString(),
-            ArgumentMatchers.any(Interval.class),
-            ArgumentMatchers.anyInt());
+           .when(overlordClient)
+           .runKillTask(
+               ArgumentMatchers.anyString(),
+               ArgumentMatchers.anyString(),
+               ArgumentMatchers.any(Interval.class),
+               ArgumentMatchers.anyInt(),
+               ArgumentMatchers.any(DateTime.class));
     target.runInternal(params);
 
     Mockito.verify(overlordClient, Mockito.times(1)).runKillTask(
         ArgumentMatchers.anyString(),
         ArgumentMatchers.eq(DATASOURCE),
         ArgumentMatchers.eq(expectedKillInterval),
-        ArgumentMatchers.eq(limit)
+        ArgumentMatchers.eq(limit),
+        ArgumentMatchers.any()
     );
   }
 
-  private void runAndVerifyKillIntervals(List<Interval> expectedKillIntervals)
-  {
-    int limit = config.getCoordinatorKillMaxSegments();
-    Mockito.doReturn(Futures.immediateFuture("ok"))
-        .when(overlordClient)
-        .runKillTask(
-            ArgumentMatchers.anyString(),
-            ArgumentMatchers.anyString(),
-            ArgumentMatchers.any(Interval.class),
-            ArgumentMatchers.anyInt());
-    for (int i = 0; i < expectedKillIntervals.size(); i++) {
-      target.run(params);
-      verifyState(ImmutableMap.of(DATASOURCE, 
yearOldSegment.getInterval().getEnd()));
-      verifyStats(9, 1, 10);
-    }
-
-    for (Interval expectedKillInterval : expectedKillIntervals) {
-      Mockito.verify(overlordClient, Mockito.times(1)).runKillTask(
-          ArgumentMatchers.anyString(),
-          ArgumentMatchers.eq(DATASOURCE),
-          ArgumentMatchers.eq(expectedKillInterval),
-          ArgumentMatchers.eq(limit)
-      );
-    }
-  }
-
   private void verifyStats(int availableSlots, int submittedTasks, int 
maxSlots)
   {
     verifyStats(availableSlots, submittedTasks, maxSlots, 1);
@@ -430,7 +407,8 @@ public class KillUnusedSegmentsTest
         ArgumentMatchers.anyString(),
         ArgumentMatchers.anyString(),
         ArgumentMatchers.any(Interval.class),
-        ArgumentMatchers.anyInt()
+        ArgumentMatchers.anyInt(),
+        ArgumentMatchers.any(DateTime.class)
     );
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
index cfb8fec941e..1e2f2d462c4 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
@@ -217,7 +217,7 @@ public class TestSegmentsMetadataManager implements 
SegmentsMetadataManager
       @Nullable final DateTime minStartTime,
       final DateTime maxEndTime,
       final int limit,
-      final DateTime maxUsedFlagLastUpdatedTime
+      final DateTime maxUsedStatusLastUpdatedTime
   )
   {
     return null;
diff --git 
a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
 
b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
index b83ebf67527..089cd2e1d29 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
@@ -591,7 +591,7 @@ public class DataSourcesResourceTest
     Interval theInterval = Intervals.of(interval.replace('_', '/'));
 
     OverlordClient overlordClient = 
EasyMock.createStrictMock(OverlordClient.class);
-    EasyMock.expect(overlordClient.runKillTask("api-issued", "datasource1", 
theInterval, null))
+    EasyMock.expect(overlordClient.runKillTask("api-issued", "datasource1", 
theInterval, null, null))
             .andReturn(Futures.immediateFuture("kill_task_1"));
     EasyMock.replay(overlordClient, server);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to