zachjsh commented on code in PR #14662:
URL: https://github.com/apache/druid/pull/14662#discussion_r1283396511


##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -190,12 +190,22 @@ public List<Pair<DataSegment, String>> 
retrieveUsedSegmentsAndCreatedDates(Strin
 
   @Override
   public List<DataSegment> retrieveUnusedSegmentsForInterval(final String 
dataSource, final Interval interval)
+  {
+    return retrieveUnusedSegmentsForInterval(dataSource, interval, null);

Review Comment:
   fixed



##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -123,17 +123,30 @@ Collection<DataSegment> retrieveUsedSegmentsForIntervals(
       Segments visibility
   );
 
+  /**
+   * see {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer)}
+   */
+  List<DataSegment> retrieveUnusedSegmentsForInterval(String dataSource, 
Interval interval);
+
   /**
    * Retrieve all published segments which include ONLY data within the given 
interval and are marked as unused from the
    * metadata store.
    *
-   * @param dataSource The data source the segments belong to
-   * @param interval   Filter the data segments to ones that include data in 
this interval exclusively.
+   * @param dataSource  The data source the segments belong to
+   * @param interval    Filter the data segments to ones that include data in 
this interval exclusively.
+   * @param maxSegments The maximum number of unused segments to retreive. If 
null, no limit is applied.
    *
    * @return DataSegments which include ONLY data within the requested 
interval and are marked as unused. Segments NOT
    * returned here may include data in the interval
    */
-  List<DataSegment> retrieveUnusedSegmentsForInterval(String dataSource, 
Interval interval);
+  default List<DataSegment> retrieveUnusedSegmentsForInterval(
+      String dataSource,
+      Interval interval,
+      @Nullable Integer maxSegments

Review Comment:
   fixed



##########
server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java:
##########
@@ -40,21 +42,27 @@ public class ClientKillUnusedSegmentsTaskQuery implements 
ClientTaskQuery
   private final Interval interval;
   private final Boolean markAsUnused;
   private final Integer batchSize;
+  @Nullable private final Integer limit;
 
   @JsonCreator
   public ClientKillUnusedSegmentsTaskQuery(
       @JsonProperty("id") String id,
       @JsonProperty("dataSource") String dataSource,
       @JsonProperty("interval") Interval interval,
-      @JsonProperty("markAsUnused") Boolean markAsUnused,
-      @JsonProperty("batchSize") Integer batchSize
+      @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
+      @JsonProperty("batchSize") Integer batchSize,
+      @JsonProperty("limit") Integer limit
   )
   {
     this.id = Preconditions.checkNotNull(id, "id");
     this.dataSource = dataSource;
     this.interval = interval;
     this.markAsUnused = markAsUnused;
     this.batchSize = batchSize;
+    if (null != limit) {
+      Preconditions.checkArgument(limit > 0, "limit must be > 0");

Review Comment:
   fixed



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -186,19 +212,48 @@ public TaskStatus runTask(TaskToolbox toolbox) throws 
Exception
       toolbox.getTaskActionClient().submit(new SegmentNukeAction(new 
HashSet<>(unusedSegments)));
       toolbox.getDataSegmentKiller().kill(unusedSegments);
       numBatchesProcessed++;
+      numSegmentsKilled += unusedSegments.size();
 
       if (numBatchesProcessed % 10 == 0) {
-        LOG.info("Processed [%d/%d] batches for kill task[%s].",
-                numBatchesProcessed, unusedSegmentBatches.size(), getId());
+        if (null != numTotalBatches) {
+          LOG.info("Processed [%d/%d] batches for kill task[%s].",
+              numBatchesProcessed, numTotalBatches, getId()
+          );
+        } else {
+          LOG.info("Processed [%d] batches for kill task[%s].", 
numBatchesProcessed, getId());
+        }
       }
-    }
 
-    LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. 
Deleted total [%,d] unused segments in [%d] batches.",
-            getId(), getDataSource(), getInterval(), allUnusedSegments.size(), 
unusedSegmentBatches.size());
+      nextBatchSize = computeNextBatchSize(numSegmentsKilled);
+    } while (unusedSegments.size() != 0 && (null == numTotalBatches || 
numBatchesProcessed < numTotalBatches));
+
+    LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. 
Deleted total [%d] unused segments "
+             + "in [%d] batches.",
+        getId(),
+        getDataSource(),
+        getInterval(),
+        numSegmentsKilled,
+        numBatchesProcessed
+    );
 
     return TaskStatus.success(getId());
   }
 
+  @JsonIgnore
+  @VisibleForTesting
+  @Nullable
+  Integer getNumTotalBatches()
+  {
+    return null != limit ? (int) Math.ceil((double) limit / batchSize) : null;
+  }
+
+  @JsonIgnore
+  @VisibleForTesting
+  int computeNextBatchSize(int numSegmentsKilled)
+  {
+    return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) : 
batchSize;

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to