This is an automated email from the ASF dual-hosted git repository.

zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ba957a9b97 Add ability to limit the number of segments killed in kill 
task (#14662)
ba957a9b97 is described below

commit ba957a9b97b6bdbecb166db771721cced1dac365
Author: zachjsh <[email protected]>
AuthorDate: Thu Aug 3 22:17:04 2023 -0400

    Add ability to limit the number of segments killed in kill task (#14662)
    
    ### Description
    
    Previously, the `maxSegments` configured for auto kill could be ignored if 
an interval of data for a  given datasource had more than this number of unused 
segments, causing the kill task spawned with the task of deleting unused 
segments in that given interval of data to delete more than the `maxSegments` 
configured. Now each kill task spawned by the auto kill coordinator duty, will 
kill at most `limit` segments. This is done by adding a new config property to 
the `KillUnusedSegmentTas [...]
---
 docs/data-management/delete.md                     |   9 +-
 .../actions/RetrieveUnusedSegmentsAction.java      |  20 ++-
 .../druid/indexing/common/task/ArchiveTask.java    |   2 +-
 .../common/task/KillUnusedSegmentsTask.java        | 119 ++++++++++++-----
 .../druid/indexing/common/task/MoveTask.java       |   2 +-
 .../druid/indexing/common/task/RestoreTask.java    |   2 +-
 .../apache/druid/indexing/common/task/Task.java    |   2 +-
 .../actions/RetrieveSegmentsActionsTest.java       |   2 +-
 ...ClientKillUnusedSegmentsTaskQuerySerdeTest.java |  13 +-
 .../common/task/KillUnusedSegmentsTaskTest.java    | 144 ++++++++++++++++++++-
 .../druid/indexing/overlord/TaskLifecycleTest.java | 101 ++++++++++++++-
 .../TestIndexerMetadataStorageCoordinator.java     |  17 +++
 .../ClientKillUnusedSegmentsTaskQuery.java         |  23 +++-
 .../IndexerMetadataStorageCoordinator.java         |  19 ++-
 .../IndexerSQLMetadataStorageCoordinator.java      |  12 +-
 .../druid/metadata/SqlSegmentsMetadataManager.java |   2 +-
 .../druid/metadata/SqlSegmentsMetadataQuery.java   |  17 ++-
 .../apache/druid/rpc/indexing/OverlordClient.java  |  33 ++++-
 .../coordinator/duty/KillUnusedSegments.java       |  10 +-
 .../druid/server/http/DataSourcesResource.java     |   2 +-
 .../ClientKillUnusedSegmentsTaskQueryTest.java     |  18 ++-
 .../IndexerSQLMetadataStorageCoordinatorTest.java  |  16 +++
 .../druid/rpc/indexing/OverlordClientImplTest.java |   9 +-
 .../coordinator/duty/KillUnusedSegmentsTest.java   |   4 +-
 .../druid/server/http/DataSourcesResourceTest.java |   2 +-
 25 files changed, 526 insertions(+), 74 deletions(-)

diff --git a/docs/data-management/delete.md b/docs/data-management/delete.md
index 260a66a174..9e59c751bc 100644
--- a/docs/data-management/delete.md
+++ b/docs/data-management/delete.md
@@ -96,15 +96,18 @@ The available grammar is:
     "dataSource": <task_datasource>,
     "interval" : <all_unused_segments_in_this_interval_will_die!>,
     "context": <task context>,
-    "batchSize": <optional_batch size>
+    "batchSize": <optional_batch size>,
+    "limit": <the maximum number of segments to delete>
 }
 ```
 
 Some of the parameters used in the task payload are further explained below:
 
-| Parameter    |Default| Explanation                                           
                                                 |
-|--------------|-------|--------------------------------------------------------------------------------------------------------|
+| Parameter   | Default         | Explanation                                  
                                                                                
                                                                                
                                                                                
                                                                               |
+|-------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | `batchSize`    |100    | Maximum number of segments that are deleted in one 
kill batch. Some operations on the Overlord may get stuck while a `kill` task 
is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus, 
a `kill` task splits the list of unused segments to be deleted into smaller 
batches to yield the Overlord resources intermittently to other task 
operations.|
+| `limit`     | null - no limit | Maximum number of segments for the kill task 
to delete.|
+
 
 **WARNING:** The `kill` task permanently removes all information about the 
affected segments from the metadata store and
 deep storage. This operation cannot be undone.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
index f114ec4564..150648858c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
@@ -27,6 +27,8 @@ import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.Interval;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 
 public class RetrieveUnusedSegmentsAction implements 
TaskAction<List<DataSegment>>
@@ -37,14 +39,19 @@ public class RetrieveUnusedSegmentsAction implements 
TaskAction<List<DataSegment
   @JsonIgnore
   private final Interval interval;
 
+  @JsonIgnore
+  private final Integer limit;
+
   @JsonCreator
   public RetrieveUnusedSegmentsAction(
       @JsonProperty("dataSource") String dataSource,
-      @JsonProperty("interval") Interval interval
+      @JsonProperty("interval") Interval interval,
+      @JsonProperty("limit") @Nullable Integer limit
   )
   {
     this.dataSource = dataSource;
     this.interval = interval;
+    this.limit = limit;
   }
 
   @JsonProperty
@@ -59,6 +66,13 @@ public class RetrieveUnusedSegmentsAction implements 
TaskAction<List<DataSegment
     return interval;
   }
 
+  @Nullable
+  @JsonProperty
+  public Integer getLimit()
+  {
+    return limit;
+  }
+
   @Override
   public TypeReference<List<DataSegment>> getReturnTypeReference()
   {
@@ -68,7 +82,8 @@ public class RetrieveUnusedSegmentsAction implements 
TaskAction<List<DataSegment
   @Override
   public List<DataSegment> perform(Task task, TaskActionToolbox toolbox)
   {
-    return 
toolbox.getIndexerMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(dataSource,
 interval);
+    return toolbox.getIndexerMetadataStorageCoordinator()
+        .retrieveUnusedSegmentsForInterval(dataSource, interval, limit);
   }
 
   @Override
@@ -83,6 +98,7 @@ public class RetrieveUnusedSegmentsAction implements 
TaskAction<List<DataSegment
     return getClass().getSimpleName() + "{" +
            "dataSource='" + dataSource + '\'' +
            ", interval=" + interval +
+           ", limit=" + limit +
            '}';
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
index 904181a243..42d04316a4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
@@ -79,7 +79,7 @@ public class ArchiveTask extends AbstractFixedIntervalTask
     // List unused segments
     final List<DataSegment> unusedSegments = toolbox
         .getTaskActionClient()
-        .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), 
myLock.getInterval()));
+        .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), 
myLock.getInterval(), null));
 
     // Verify none of these segments have versions > lock version
     for (final DataSegment unusedSegment : unusedSegments) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
index 0d54ae96b0..35653aa230 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
@@ -26,8 +26,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskToolbox;
@@ -37,6 +37,7 @@ import 
org.apache.druid.indexing.common.actions.SegmentNukeAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskLocks;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.timeline.DataSegment;
@@ -44,6 +45,8 @@ import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -61,6 +64,7 @@ import java.util.stream.Collectors;
  */
 public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
 {
+  public static final String TYPE = "kill";
   private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);
 
   /**
@@ -74,14 +78,16 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
   private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100;
 
   private final boolean markAsUnused;
-
   /**
    * Split processing to try and keep each nuke operation relatively short, in 
the case that either
    * the database or the storage layer is particularly slow.
    */
   private final int batchSize;
+  @Nullable private final Integer limit;
+
 
-  // counter included primarily for testing
+  // counters included primarily for testing
+  private int numSegmentsKilled = 0;
   private long numBatchesProcessed = 0;
 
   @JsonCreator
@@ -90,8 +96,9 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
       @JsonProperty("dataSource") String dataSource,
       @JsonProperty("interval") Interval interval,
       @JsonProperty("context") Map<String, Object> context,
-      @JsonProperty("markAsUnused") Boolean markAsUnused,
-      @JsonProperty("batchSize") Integer batchSize
+      @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
+      @JsonProperty("batchSize") Integer batchSize,
+      @JsonProperty("limit") @Nullable Integer limit
   )
   {
     super(
@@ -103,6 +110,19 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
     this.markAsUnused = markAsUnused != null && markAsUnused;
     this.batchSize = (batchSize != null) ? batchSize : 
DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
     Preconditions.checkArgument(this.batchSize > 0, "batchSize should be 
greater than zero");
+    if (null != limit && limit <= 0) {
+      throw InvalidInput.exception(
+          "limit [%d] is invalid. It must be a positive integer.",
+          limit
+      );
+    }
+    if (limit != null && markAsUnused != null && markAsUnused) {
+      throw InvalidInput.exception(
+          "limit cannot be provided with markAsUnused.",
+          limit
+      );
+    }
+    this.limit = limit;
   }
 
   @JsonProperty
@@ -119,10 +139,17 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
     return batchSize;
   }
 
+  @Nullable
+  @JsonProperty
+  public Integer getLimit()
+  {
+    return limit;
+  }
+
   @Override
   public String getType()
   {
-    return "kill";
+    return TYPE;
   }
 
   @Nonnull
@@ -140,6 +167,13 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
     return numBatchesProcessed;
   }
 
+  @JsonIgnore
+  @VisibleForTesting
+  long getNumSegmentsKilled()
+  {
+    return numSegmentsKilled;
+  }
+
   @Override
   public TaskStatus runTask(TaskToolbox toolbox) throws Exception
   {
@@ -153,27 +187,29 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
     }
 
     // List unused segments
-    final List<DataSegment> allUnusedSegments = toolbox
-        .getTaskActionClient()
-        .submit(new RetrieveUnusedSegmentsAction(getDataSource(), 
getInterval()));
-
-    final List<List<DataSegment>> unusedSegmentBatches = 
Lists.partition(allUnusedSegments, batchSize);
-
-    // The individual activities here on the toolbox have possibility to run 
for a longer period of time,
-    // since they involve calls to metadata storage and archival object 
storage. And, the tasks take hold of the
-    // task lockbox to run. By splitting the segment list into smaller 
batches, we have an opportunity to yield the
-    // lock to other activity that might need to happen using the overlord 
tasklockbox.
-
-    LOG.info("Running kill task[%s] for dataSource[%s] and interval[%s]. 
Killing total [%,d] unused segments in [%d] batches(batchSize = [%d]).",
-            getId(), getDataSource(), getInterval(), allUnusedSegments.size(), 
unusedSegmentBatches.size(), batchSize);
+    int nextBatchSize = computeNextBatchSize(numSegmentsKilled);
+    @Nullable Integer numTotalBatches = getNumTotalBatches();
+    List<DataSegment> unusedSegments;
+    LOG.info(
+        "Starting kill with batchSize[%d], up to limit[%d] segments will be 
deleted%s",
+        batchSize,
+        limit,
+        numTotalBatches != null ? StringUtils.format(" in([%d] batches]).", 
numTotalBatches) : "."
+    );
+    do {
+      if (nextBatchSize <= 0) {
+        break;
+      }
+      unusedSegments = toolbox
+          .getTaskActionClient()
+          .submit(new RetrieveUnusedSegmentsAction(getDataSource(), 
getInterval(), nextBatchSize));
 
-    for (final List<DataSegment> unusedSegments : unusedSegmentBatches) {
       if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
         throw new ISE(
-                "Locks[%s] for task[%s] can't cover segments[%s]",
-                
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
-                getId(),
-                unusedSegments
+            "Locks[%s] for task[%s] can't cover segments[%s]",
+            
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
+            getId(),
+            unusedSegments
         );
       }
 
@@ -186,19 +222,40 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
       toolbox.getTaskActionClient().submit(new SegmentNukeAction(new 
HashSet<>(unusedSegments)));
       toolbox.getDataSegmentKiller().kill(unusedSegments);
       numBatchesProcessed++;
+      numSegmentsKilled += unusedSegments.size();
 
-      if (numBatchesProcessed % 10 == 0) {
-        LOG.info("Processed [%d/%d] batches for kill task[%s].",
-                numBatchesProcessed, unusedSegmentBatches.size(), getId());
-      }
-    }
+      LOG.info("Processed [%d] batches for kill task[%s].", 
numBatchesProcessed, getId());
 
-    LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. 
Deleted total [%,d] unused segments in [%d] batches.",
-            getId(), getDataSource(), getInterval(), allUnusedSegments.size(), 
unusedSegmentBatches.size());
+      nextBatchSize = computeNextBatchSize(numSegmentsKilled);
+    } while (unusedSegments.size() != 0 && (null == numTotalBatches || 
numBatchesProcessed < numTotalBatches));
+
+    LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. 
Deleted total [%d] unused segments "
+             + "in [%d] batches.",
+        getId(),
+        getDataSource(),
+        getInterval(),
+        numSegmentsKilled,
+        numBatchesProcessed
+    );
 
     return TaskStatus.success(getId());
   }
 
+  @JsonIgnore
+  @VisibleForTesting
+  @Nullable
+  Integer getNumTotalBatches()
+  {
+    return null != limit ? (int) Math.ceil((double) limit / batchSize) : null;
+  }
+
+  @JsonIgnore
+  @VisibleForTesting
+  int computeNextBatchSize(int numSegmentsKilled)
+  {
+    return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) : 
batchSize;
+  }
+
   private NavigableMap<DateTime, List<TaskLock>> 
getTaskLockMap(TaskActionClient client) throws IOException
   {
     final NavigableMap<DateTime, List<TaskLock>> taskLockMap = new TreeMap<>();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
index 3e8b792eb8..d23b3820db 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
@@ -87,7 +87,7 @@ public class MoveTask extends AbstractFixedIntervalTask
     // List unused segments
     final List<DataSegment> unusedSegments = toolbox
         .getTaskActionClient()
-        .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), 
myLock.getInterval()));
+        .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), 
myLock.getInterval(), null));
 
     // Verify none of these segments have versions > lock version
     for (final DataSegment unusedSegment : unusedSegments) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
index 5e91ad3375..1364bcb597 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
@@ -80,7 +80,7 @@ public class RestoreTask extends AbstractFixedIntervalTask
     // List unused segments
     final List<DataSegment> unusedSegments = toolbox
         .getTaskActionClient()
-        .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), 
myLock.getInterval()));
+        .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), 
myLock.getInterval(), null));
 
     // Verify none of these segments have versions > lock version
     for (final DataSegment unusedSegment : unusedSegments) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 5c3186ef39..58a2ad435b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -61,7 +61,7 @@ import java.util.Set;
  */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 @JsonSubTypes(value = {
-    @Type(name = "kill", value = KillUnusedSegmentsTask.class),
+    @Type(name = KillUnusedSegmentsTask.TYPE, value = 
KillUnusedSegmentsTask.class),
     @Type(name = "move", value = MoveTask.class),
     @Type(name = "archive", value = ArchiveTask.class),
     @Type(name = "restore", value = RestoreTask.class),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
index 6149208fc3..2d360dfeb5 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
@@ -104,7 +104,7 @@ public class RetrieveSegmentsActionsTest
   @Test
   public void testRetrieveUnusedSegmentsAction()
   {
-    final RetrieveUnusedSegmentsAction action = new 
RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL);
+    final RetrieveUnusedSegmentsAction action = new 
RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null);
     final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, 
actionTestKit.getTaskActionToolbox()));
     Assert.assertEquals(expectedUnusedSegments, resultSegments);
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
index 70cd5fcf19..3ab6bae468 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
@@ -51,8 +51,9 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
         "killTaskId",
         "datasource",
         Intervals.of("2020-01-01/P1D"),
-        true,
-        99
+        false,
+        99,
+        5
     );
     final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
     final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) 
objectMapper.readValue(json, Task.class);
@@ -61,6 +62,8 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
     Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
     Assert.assertEquals(taskQuery.getMarkAsUnused(), 
fromJson.isMarkAsUnused());
     Assert.assertEquals(taskQuery.getBatchSize(), 
Integer.valueOf(fromJson.getBatchSize()));
+    Assert.assertEquals(taskQuery.getLimit(), fromJson.getLimit());
+
   }
 
   @Test
@@ -71,6 +74,7 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
             "datasource",
             Intervals.of("2020-01-01/P1D"),
             true,
+            null,
             null
     );
     final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
@@ -80,6 +84,7 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
     Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
     Assert.assertEquals(taskQuery.getMarkAsUnused(), 
fromJson.isMarkAsUnused());
     Assert.assertEquals(100, fromJson.getBatchSize());
+    Assert.assertNull(taskQuery.getLimit());
   }
 
   @Test
@@ -91,7 +96,8 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
         Intervals.of("2020-01-01/P1D"),
         null,
         true,
-        99
+        99,
+        null
     );
     final byte[] json = objectMapper.writeValueAsBytes(task);
     final ClientKillUnusedSegmentsTaskQuery taskQuery = 
(ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue(
@@ -103,5 +109,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
     Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
     Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
     Assert.assertEquals(Integer.valueOf(task.getBatchSize()), 
taskQuery.getBatchSize());
+    Assert.assertNull(task.getLimit());
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
index f57624ed7d..3a89c19a78 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
@@ -81,6 +81,7 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
             Intervals.of("2019-03-01/2019-04-01"),
             null,
             false,
+            null,
             null
         );
 
@@ -97,7 +98,9 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
         newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
         newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
     );
-    Assert.assertEquals(1L, task.getNumBatchesProcessed());
+
+    Assert.assertEquals(2L, task.getNumBatchesProcessed());
+    Assert.assertEquals(1, task.getNumSegmentsKilled());
   }
 
 
@@ -128,6 +131,7 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
             Intervals.of("2019-03-01/2019-04-01"),
             null,
             true,
+            null,
             null
         );
 
@@ -144,7 +148,8 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
         newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
         newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
     );
-    Assert.assertEquals(1L, task.getNumBatchesProcessed());
+    Assert.assertEquals(2L, task.getNumBatchesProcessed());
+    Assert.assertEquals(1, task.getNumSegmentsKilled());
   }
 
   @Test
@@ -157,13 +162,14 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
             Intervals.of("2019-03-01/2019-04-01"),
             null,
             true,
+            null,
             null
         );
     Assert.assertTrue(task.getInputSourceResources().isEmpty());
   }
 
   @Test
-  public void testKillBatchSizeOne() throws Exception
+  public void testKillBatchSizeOneAndLimit4() throws Exception
   {
     final String version = DateTimes.nowUtc().toString();
     final Set<DataSegment> segments = ImmutableSet.of(
@@ -176,14 +182,23 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
 
     Assert.assertEquals(segments, announced);
 
+    Assert.assertEquals(
+        segments.size(),
+        getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+            DATA_SOURCE,
+            Intervals.of("2018-01-01/2020-01-01")
+        )
+    );
+
     final KillUnusedSegmentsTask task =
             new KillUnusedSegmentsTask(
                     null,
                     DATA_SOURCE,
                     Intervals.of("2018-01-01/2020-01-01"),
                     null,
-                    true,
-                    1
+                    false,
+                    1,
+                4
             );
 
     Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task).get().getStatusCode());
@@ -195,6 +210,7 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
 
     Assert.assertEquals(Collections.emptyList(), unusedSegments);
     Assert.assertEquals(4L, task.getNumBatchesProcessed());
+    Assert.assertEquals(4, task.getNumSegmentsKilled());
   }
 
   @Test
@@ -218,7 +234,8 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
                     Intervals.of("2018-01-01/2020-01-01"),
                     null,
                     true,
-                    3
+                    3,
+                null
             );
 
     Assert.assertEquals(TaskState.SUCCESS, 
taskRunner.run(task).get().getStatusCode());
@@ -229,7 +246,120 @@ public class KillUnusedSegmentsTaskTest extends 
IngestionTestBase
             
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, 
Intervals.of("2019/2020"));
 
     Assert.assertEquals(Collections.emptyList(), unusedSegments);
-    Assert.assertEquals(2L, task.getNumBatchesProcessed());
+    Assert.assertEquals(3L, task.getNumBatchesProcessed());
+    Assert.assertEquals(4, task.getNumSegmentsKilled());
+  }
+
+  @Test
+  public void testComputeNextBatchSizeDefault()
+  {
+    final KillUnusedSegmentsTask task =
+        new KillUnusedSegmentsTask(
+            null,
+            DATA_SOURCE,
+            Intervals.of("2018-01-01/2020-01-01"),
+            null,
+            false,
+            null,
+            null
+        );
+    Assert.assertEquals(100, task.computeNextBatchSize(50));
+  }
+
+  @Test
+  public void testComputeNextBatchSizeWithBatchSizeLargerThanLimit()
+  {
+    final KillUnusedSegmentsTask task =
+        new KillUnusedSegmentsTask(
+            null,
+            DATA_SOURCE,
+            Intervals.of("2018-01-01/2020-01-01"),
+            null,
+            false,
+            10,
+            5
+        );
+    Assert.assertEquals(5, task.computeNextBatchSize(0));
+  }
+
+  @Test
+  public void testComputeNextBatchSizeWithBatchSizeSmallerThanLimit()
+  {
+    final KillUnusedSegmentsTask task =
+        new KillUnusedSegmentsTask(
+            null,
+            DATA_SOURCE,
+            Intervals.of("2018-01-01/2020-01-01"),
+            null,
+            false,
+            5,
+            10
+        );
+    Assert.assertEquals(5, task.computeNextBatchSize(0));
+  }
+
+  @Test
+  public void testComputeNextBatchSizeWithRemainingLessThanLimit()
+  {
+    final KillUnusedSegmentsTask task =
+        new KillUnusedSegmentsTask(
+            null,
+            DATA_SOURCE,
+            Intervals.of("2018-01-01/2020-01-01"),
+            null,
+            false,
+            5,
+            10
+        );
+    Assert.assertEquals(3, task.computeNextBatchSize(7));
+  }
+
+  @Test
+  public void testGetNumTotalBatchesDefault()
+  {
+    final KillUnusedSegmentsTask task =
+        new KillUnusedSegmentsTask(
+            null,
+            DATA_SOURCE,
+            Intervals.of("2018-01-01/2020-01-01"),
+            null,
+            false,
+            null,
+            null
+        );
+    Assert.assertNull(task.getNumTotalBatches());
+  }
+
+  @Test
+  public void testGetNumTotalBatchesWithBatchSizeLargerThanLimit()
+  {
+    final KillUnusedSegmentsTask task =
+        new KillUnusedSegmentsTask(
+            null,
+            DATA_SOURCE,
+            Intervals.of("2018-01-01/2020-01-01"),
+            null,
+            false,
+            10,
+            5
+        );
+    Assert.assertEquals(1, (int) task.getNumTotalBatches());
+  }
+
+  @Test
+  public void testGetNumTotalBatchesWithBatchSizeSmallerThanLimit()
+  {
+    final KillUnusedSegmentsTask task =
+        new KillUnusedSegmentsTask(
+            null,
+            DATA_SOURCE,
+            Intervals.of("2018-01-01/2020-01-01"),
+            null,
+            false,
+            5,
+            10
+        );
+    Assert.assertEquals(2, (int) task.getNumTotalBatches());
   }
 
   private static DataSegment newSegment(Interval interval, String version)
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 2ea7fcedf1..31ec645b3f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -949,6 +949,7 @@ public class TaskLifecycleTest extends 
InitializedNullHandlingTest
             Intervals.of("2011-04-01/P4D"),
             null,
             false,
+            null,
             null
         );
 
@@ -957,7 +958,7 @@ public class TaskLifecycleTest extends 
InitializedNullHandlingTest
     Assert.assertEquals("merged statusCode", TaskState.SUCCESS, 
status.getStatusCode());
     Assert.assertEquals("num segments published", 0, 
mdc.getPublished().size());
     Assert.assertEquals("num segments nuked", 3, mdc.getNuked().size());
-    Assert.assertEquals("delete segment batch call count", 1, 
mdc.getDeleteSegmentsCount());
+    Assert.assertEquals("delete segment batch call count", 2, 
mdc.getDeleteSegmentsCount());
     Assert.assertTrue(
         "expected unused segments get killed",
         expectedUnusedSegments.containsAll(mdc.getNuked()) && 
mdc.getNuked().containsAll(
@@ -970,6 +971,104 @@ public class TaskLifecycleTest extends 
InitializedNullHandlingTest
     }
   }
 
+  @Test
+  public void testKillUnusedSegmentsTaskWithMaxSegmentsToKill() throws 
Exception
+  {
+    final File tmpSegmentDir = temporaryFolder.newFolder();
+
+    List<DataSegment> expectedUnusedSegments = Lists.transform(
+        ImmutableList.of(
+            "2011-04-01/2011-04-02",
+            "2011-04-02/2011-04-03",
+            "2011-04-04/2011-04-05"
+        ), new Function<String, DataSegment>()
+        {
+          @Override
+          public DataSegment apply(String input)
+          {
+            final Interval interval = Intervals.of(input);
+            try {
+              return DataSegment.builder()
+                  .dataSource("test_kill_task")
+                  .interval(interval)
+                  .loadSpec(
+                      ImmutableMap.of(
+                          "type",
+                          "local",
+                          "path",
+                          tmpSegmentDir.getCanonicalPath()
+                          + "/druid/localStorage/wikipedia/"
+                          + interval.getStart()
+                          + "-"
+                          + interval.getEnd()
+                          + "/"
+                          + "2011-04-6T16:52:46.119-05:00"
+                          + "/0/index.zip"
+                      )
+                  )
+                  .version("2011-04-6T16:52:46.119-05:00")
+                  .dimensions(ImmutableList.of())
+                  .metrics(ImmutableList.of())
+                  .shardSpec(NoneShardSpec.instance())
+                  .binaryVersion(9)
+                  .size(0)
+                  .build();
+            }
+            catch (IOException e) {
+              throw new ISE(e, "Error creating segments");
+            }
+          }
+        }
+    );
+
+    mdc.setUnusedSegments(expectedUnusedSegments);
+
+    // manually create local segments files
+    List<File> segmentFiles = new ArrayList<>();
+    for (DataSegment segment : 
mdc.retrieveUnusedSegmentsForInterval("test_kill_task", 
Intervals.of("2011-04-01/P4D"))) {
+      File file = new File((String) segment.getLoadSpec().get("path"));
+      FileUtils.mkdirp(file.getParentFile());
+      Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY);
+      segmentFiles.add(file);
+    }
+
+    final int maxSegmentsToKill = 2;
+    final Task killUnusedSegmentsTask =
+        new KillUnusedSegmentsTask(
+            null,
+            "test_kill_task",
+            Intervals.of("2011-04-01/P4D"),
+            null,
+            false,
+            null,
+            maxSegmentsToKill
+        );
+
+    final TaskStatus status = runTask(killUnusedSegmentsTask);
+    Assert.assertEquals(taskLocation, status.getLocation());
+    Assert.assertEquals("merged statusCode", TaskState.SUCCESS, 
status.getStatusCode());
+    Assert.assertEquals("num segments published", 0, 
mdc.getPublished().size());
+    Assert.assertEquals("num segments nuked", maxSegmentsToKill, 
mdc.getNuked().size());
+    Assert.assertTrue(
+        "expected unused segments get killed",
+        expectedUnusedSegments.containsAll(mdc.getNuked())
+    );
+
+    int expectedNumOfSegmentsRemaining = segmentFiles.size() - 
maxSegmentsToKill;
+    int actualNumOfSegmentsRemaining = 0;
+    for (File file : segmentFiles) {
+      if (file.exists()) {
+        actualNumOfSegmentsRemaining++;
+      }
+    }
+
+    Assert.assertEquals(
+        "Expected of segments deleted did not match expectations",
+        expectedNumOfSegmentsRemaining,
+        actualNumOfSegmentsRemaining
+    );
+  }
+
   @Test
   public void testRealtimeishTask() throws Exception
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index c8bf8fd28a..66af0a9730 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -42,6 +42,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Stream;
 
 public class TestIndexerMetadataStorageCoordinator implements 
IndexerMetadataStorageCoordinator
 {
@@ -110,6 +111,22 @@ public class TestIndexerMetadataStorageCoordinator 
implements IndexerMetadataSto
     }
   }
 
+  @Override
+  public List<DataSegment> retrieveUnusedSegmentsForInterval(String 
dataSource, Interval interval, @Nullable Integer limit)
+  {
+    synchronized (unusedSegments) {
+      Stream<DataSegment> resultStream = unusedSegments.stream();
+
+      resultStream = resultStream.filter(ds -> !nuked.contains(ds));
+
+      if (limit != null) {
+        resultStream = resultStream.limit(limit);
+      }
+
+      return ImmutableList.copyOf(resultStream.iterator());
+    }
+  }
+
   @Override
   public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval 
interval)
   {
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
index 779c8acc7f..3676d68409 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.joda.time.Interval;
 
+import javax.annotation.Nullable;
+
 import java.util.Objects;
 
 /**
@@ -40,14 +42,16 @@ public class ClientKillUnusedSegmentsTaskQuery implements 
ClientTaskQuery
   private final Interval interval;
   private final Boolean markAsUnused;
   private final Integer batchSize;
+  @Nullable private final Integer limit;
 
   @JsonCreator
   public ClientKillUnusedSegmentsTaskQuery(
       @JsonProperty("id") String id,
       @JsonProperty("dataSource") String dataSource,
       @JsonProperty("interval") Interval interval,
-      @JsonProperty("markAsUnused") Boolean markAsUnused,
-      @JsonProperty("batchSize") Integer batchSize
+      @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
+      @JsonProperty("batchSize") Integer batchSize,
+      @JsonProperty("limit") Integer limit
   )
   {
     this.id = Preconditions.checkNotNull(id, "id");
@@ -55,6 +59,8 @@ public class ClientKillUnusedSegmentsTaskQuery implements 
ClientTaskQuery
     this.interval = interval;
     this.markAsUnused = markAsUnused;
     this.batchSize = batchSize;
+    Preconditions.checkArgument(limit == null || limit > 0, "limit must be > 
0");
+    this.limit = limit;
   }
 
   @JsonProperty
@@ -96,6 +102,14 @@ public class ClientKillUnusedSegmentsTaskQuery implements 
ClientTaskQuery
     return batchSize;
   }
 
+  @JsonProperty
+  @Nullable
+  public Integer getLimit()
+  {
+    return limit;
+  }
+
+
   @Override
   public boolean equals(Object o)
   {
@@ -110,12 +124,13 @@ public class ClientKillUnusedSegmentsTaskQuery implements 
ClientTaskQuery
            && Objects.equals(dataSource, that.dataSource)
            && Objects.equals(interval, that.interval)
            && Objects.equals(markAsUnused, that.markAsUnused)
-           && Objects.equals(batchSize, that.batchSize);
+           && Objects.equals(batchSize, that.batchSize)
+           && Objects.equals(limit, that.limit);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(id, dataSource, interval, markAsUnused, batchSize);
+    return Objects.hash(id, dataSource, interval, markAsUnused, batchSize, 
limit);
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index b3c70f0cdb..3d8f4b8586 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -123,17 +123,30 @@ public interface IndexerMetadataStorageCoordinator
       Segments visibility
   );
 
+  /**
+   * see {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer)}
+   */
+  default List<DataSegment> retrieveUnusedSegmentsForInterval(String 
dataSource, Interval interval)
+  {
+    return retrieveUnusedSegmentsForInterval(dataSource, interval, null);
+  }
+
   /**
    * Retrieve all published segments which include ONLY data within the given 
interval and are marked as unused from the
    * metadata store.
    *
-   * @param dataSource The data source the segments belong to
-   * @param interval   Filter the data segments to ones that include data in 
this interval exclusively.
+   * @param dataSource  The data source the segments belong to
+   * @param interval    Filter the data segments to ones that include data in 
this interval exclusively.
+   * @param limit The maximum number of unused segments to retreive. If null, 
no limit is applied.
    *
    * @return DataSegments which include ONLY data within the requested 
interval and are marked as unused. Segments NOT
    * returned here may include data in the interval
    */
-  List<DataSegment> retrieveUnusedSegmentsForInterval(String dataSource, 
Interval interval);
+  List<DataSegment> retrieveUnusedSegmentsForInterval(
+      String dataSource,
+      Interval interval,
+      @Nullable Integer limit
+  );
 
   /**
    * Mark as unused segments which include ONLY data within the given interval.
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 107dc4b9f9..6c4d523133 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -190,12 +190,22 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
 
   @Override
   public List<DataSegment> retrieveUnusedSegmentsForInterval(final String 
dataSource, final Interval interval)
+  {
+    return retrieveUnusedSegmentsForInterval(dataSource, interval, null);
+  }
+
+  @Override
+  public List<DataSegment> retrieveUnusedSegmentsForInterval(
+      String dataSource,
+      Interval interval,
+      @Nullable Integer limit
+  )
   {
     final List<DataSegment> matchingSegments = connector.inReadOnlyTransaction(
         (handle, status) -> {
           try (final CloseableIterator<DataSegment> iterator =
                    SqlSegmentsMetadataQuery.forHandle(handle, connector, 
dbTables, jsonMapper)
-                                           .retrieveUnusedSegments(dataSource, 
Collections.singletonList(interval))) {
+                       .retrieveUnusedSegments(dataSource, 
Collections.singletonList(interval), limit)) {
             return ImmutableList.copyOf(iterator);
           }
         }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
index d98c5d4d08..46db26a56d 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
@@ -574,7 +574,7 @@ public class SqlSegmentsMetadataManager implements 
SegmentsMetadataManager
           }
 
           try (final CloseableIterator<DataSegment> iterator =
-                   queryTool.retrieveUnusedSegments(dataSourceName, 
intervals)) {
+                   queryTool.retrieveUnusedSegments(dataSourceName, intervals, 
null)) {
             while (iterator.hasNext()) {
               final DataSegment dataSegment = iterator.next();
               timeline.addSegments(Iterators.singletonIterator(dataSegment));
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index 13460e2695..45896a865e 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -37,6 +37,8 @@ import org.skife.jdbi.v2.PreparedBatch;
 import org.skife.jdbi.v2.Query;
 import org.skife.jdbi.v2.ResultIterator;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -104,7 +106,7 @@ public class SqlSegmentsMetadataQuery
       final Collection<Interval> intervals
   )
   {
-    return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, 
true);
+    return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, 
true, null);
   }
 
   /**
@@ -118,10 +120,11 @@ public class SqlSegmentsMetadataQuery
    */
   public CloseableIterator<DataSegment> retrieveUnusedSegments(
       final String dataSource,
-      final Collection<Interval> intervals
+      final Collection<Interval> intervals,
+      @Nullable final Integer limit
   )
   {
-    return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, 
false);
+    return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, 
false, limit);
   }
 
   /**
@@ -201,7 +204,7 @@ public class SqlSegmentsMetadataQuery
       // Retrieve, then drop, since we can't write a WHERE clause directly.
       final List<SegmentId> segments = ImmutableList.copyOf(
           Iterators.transform(
-              retrieveSegments(dataSource, 
Collections.singletonList(interval), IntervalMode.CONTAINS, true),
+              retrieveSegments(dataSource, 
Collections.singletonList(interval), IntervalMode.CONTAINS, true, null),
               DataSegment::getId
           )
       );
@@ -213,7 +216,8 @@ public class SqlSegmentsMetadataQuery
       final String dataSource,
       final Collection<Interval> intervals,
       final IntervalMode matchMode,
-      final boolean used
+      final boolean used,
+      @Nullable final Integer limit
   )
   {
     // Check if the intervals all support comparing as strings. If so, bake 
them into the SQL.
@@ -259,6 +263,9 @@ public class SqlSegmentsMetadataQuery
         .setFetchSize(connector.getStreamingFetchSize())
         .bind("used", used)
         .bind("dataSource", dataSource);
+    if (null != limit) {
+      sql.setMaxRows(limit);
+    }
 
     if (compareAsString) {
       final Iterator<Interval> iterator = intervals.iterator();
diff --git 
a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java 
b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
index 51b4323a11..b4391dc0a8 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
@@ -84,9 +84,40 @@ public interface OverlordClient
    * @return future with task ID
    */
   default ListenableFuture<String> runKillTask(String idPrefix, String 
dataSource, Interval interval)
+  {
+    return runKillTask(idPrefix, dataSource, interval, null);
+  }
+
+  /**
+   * Run a "kill" task for a particular datasource and interval. Shortcut to 
{@link #runTask(String, Object)}.
+   *
+   * The kill task deletes all unused segment records from deep storage and 
the metadata store. The task runs
+   * asynchronously after the API call returns. The resolved future is the ID 
of the task, which can be used to
+   * monitor its progress through the {@link #taskStatus(String)} API.
+   *
+   * @param idPrefix   Descriptive prefix to include at the start of task IDs
+   * @param dataSource Datasource to kill
+   * @param interval   Interval to kill
+   * @param maxSegmentsToKill  The maximum number of segments to kill
+   *
+   * @return future with task ID
+   */
+  default ListenableFuture<String> runKillTask(
+      String idPrefix,
+      String dataSource,
+      Interval interval,
+      @Nullable Integer maxSegmentsToKill
+  )
   {
     final String taskId = IdUtils.newTaskId(idPrefix, 
ClientKillUnusedSegmentsTaskQuery.TYPE, dataSource, interval);
-    final ClientTaskQuery taskQuery = new 
ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval, false, null);
+    final ClientTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(
+        taskId,
+        dataSource,
+        interval,
+        false,
+        null,
+        maxSegmentsToKill
+    );
     return FutureUtils.transform(runTask(taskId, taskQuery), ignored -> 
taskId);
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index f4b6240b43..205947b003 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -33,6 +33,8 @@ import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 import java.util.List;
 
@@ -130,7 +132,12 @@ public class KillUnusedSegments implements CoordinatorDuty
       }
 
       try {
-        
FutureUtils.getUnchecked(overlordClient.runKillTask("coordinator-issued", 
dataSource, intervalToKill), true);
+        FutureUtils.getUnchecked(overlordClient.runKillTask(
+            "coordinator-issued",
+            dataSource,
+            intervalToKill,
+            maxSegmentsToKill
+        ), true);
         ++submittedTasks;
       }
       catch (Exception ex) {
@@ -148,6 +155,7 @@ public class KillUnusedSegments implements CoordinatorDuty
   /**
    * Calculates the interval for which segments are to be killed in a 
datasource.
    */
+  @Nullable
   private Interval findIntervalForKill(String dataSource)
   {
     final DateTime maxEndTime = ignoreRetainDuration
diff --git 
a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java 
b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index 4a00f61300..1e146736da 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -342,7 +342,7 @@ public class DataSourcesResource
     }
     final Interval theInterval = Intervals.of(interval.replace('_', '/'));
     try {
-      FutureUtils.getUnchecked(overlordClient.runKillTask("api-issued", 
dataSourceName, theInterval), true);
+      FutureUtils.getUnchecked(overlordClient.runKillTask("api-issued", 
dataSourceName, theInterval, null), true);
       return Response.ok().build();
     }
     catch (Exception e) {
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
 
b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
index af9b2c8ec1..60edff9307 100644
--- 
a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
@@ -35,13 +35,21 @@ public class ClientKillUnusedSegmentsTaskQueryTest
   private static final Interval INTERVAL = new Interval(START, START.plus(1));
   private static final Boolean MARK_UNUSED = true;
   private static final Integer BATCH_SIZE = 999;
+  private static final Integer LIMIT = 1000;
 
   ClientKillUnusedSegmentsTaskQuery clientKillUnusedSegmentsQuery;
 
   @Before
   public void setUp()
   {
-    clientKillUnusedSegmentsQuery = new 
ClientKillUnusedSegmentsTaskQuery("killTaskId", DATA_SOURCE, INTERVAL, true, 
BATCH_SIZE);
+    clientKillUnusedSegmentsQuery = new ClientKillUnusedSegmentsTaskQuery(
+        "killTaskId",
+        DATA_SOURCE,
+        INTERVAL,
+        true,
+        BATCH_SIZE,
+        LIMIT
+    );
   }
 
   @After
@@ -80,12 +88,18 @@ public class ClientKillUnusedSegmentsTaskQueryTest
     Assert.assertEquals(BATCH_SIZE, 
clientKillUnusedSegmentsQuery.getBatchSize());
   }
 
+  @Test
+  public void testGetLimit()
+  {
+    Assert.assertEquals(LIMIT, clientKillUnusedSegmentsQuery.getLimit());
+  }
+
   @Test
   public void testEquals()
   {
     EqualsVerifier.forClass(ClientKillUnusedSegmentsTaskQuery.class)
                   .usingGetClass()
-                  .withNonnullFields("id", "dataSource", "interval", 
"batchSize")
+                  .withNonnullFields("id", "dataSource", "interval", 
"batchSize", "limit")
                   .verify();
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 74e06bfb66..444e159411 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -932,6 +932,22 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     );
   }
 
+  @Test
+  public void testSimpleUnusedListWithLimit() throws IOException
+  {
+    coordinator.announceHistoricalSegments(SEGMENTS);
+    markAllSegmentsUnused();
+    int limit = SEGMENTS.size() - 1;
+    Set<DataSegment> retreivedUnusedSegments = ImmutableSet.copyOf(
+        coordinator.retrieveUnusedSegmentsForInterval(
+            defaultSegment.getDataSource(),
+            defaultSegment.getInterval(),
+            limit
+        )
+    );
+    Assert.assertEquals(limit, retreivedUnusedSegments.size());
+    Assert.assertTrue(SEGMENTS.containsAll(retreivedUnusedSegments));
+  }
 
   @Test
   public void testUsedOverlapLow() throws IOException
diff --git 
a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
 
b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
index c1e48c496c..5b9a88d584 100644
--- 
a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
+++ 
b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
@@ -422,7 +422,14 @@ public class OverlordClientImplTest
   public void test_taskPayload() throws ExecutionException, 
InterruptedException, JsonProcessingException
   {
     final String taskID = "taskId_1";
-    final ClientTaskQuery clientTaskQuery = new 
ClientKillUnusedSegmentsTaskQuery(taskID, "test", null, null, null);
+    final ClientTaskQuery clientTaskQuery = new 
ClientKillUnusedSegmentsTaskQuery(
+        taskID,
+        "test",
+        null,
+        null,
+        null,
+        null
+    );
 
     serviceClient.expectAndRespond(
         new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/task/" + taskID),
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index 8f43b36bbe..039174eac7 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -216,11 +216,13 @@ public class KillUnusedSegmentsTest
 
   private void runAndVerifyKillInterval(Interval expectedKillInterval)
   {
+    int limit = config.getCoordinatorKillMaxSegments();
     target.run(params);
     Mockito.verify(overlordClient, Mockito.times(1)).runKillTask(
         ArgumentMatchers.anyString(),
         ArgumentMatchers.eq("DS1"),
-        ArgumentMatchers.eq(expectedKillInterval)
+        ArgumentMatchers.eq(expectedKillInterval),
+        ArgumentMatchers.eq(limit)
     );
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
 
b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
index 5dec3154d6..5511514585 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
@@ -593,7 +593,7 @@ public class DataSourcesResourceTest
     Interval theInterval = Intervals.of(interval.replace('_', '/'));
 
     OverlordClient overlordClient = 
EasyMock.createStrictMock(OverlordClient.class);
-    EasyMock.expect(overlordClient.runKillTask("api-issued", "datasource1", 
theInterval))
+    EasyMock.expect(overlordClient.runKillTask("api-issued", "datasource1", 
theInterval, null))
             .andReturn(Futures.immediateFuture(null));
     EasyMock.replay(overlordClient, server);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to