jasonk000 commented on code in PR #14662:
URL: https://github.com/apache/druid/pull/14662#discussion_r1282525925


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -186,19 +212,48 @@ public TaskStatus runTask(TaskToolbox toolbox) throws 
Exception
       toolbox.getTaskActionClient().submit(new SegmentNukeAction(new 
HashSet<>(unusedSegments)));
       toolbox.getDataSegmentKiller().kill(unusedSegments);
       numBatchesProcessed++;
+      numSegmentsKilled += unusedSegments.size();
 
       if (numBatchesProcessed % 10 == 0) {
-        LOG.info("Processed [%d/%d] batches for kill task[%s].",
-                numBatchesProcessed, unusedSegmentBatches.size(), getId());
+        if (null != numTotalBatches) {
+          LOG.info("Processed [%d/%d] batches for kill task[%s].",
+              numBatchesProcessed, numTotalBatches, getId()
+          );
+        } else {
+          LOG.info("Processed [%d] batches for kill task[%s].", 
numBatchesProcessed, getId());
+        }
       }
-    }
 
-    LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. 
Deleted total [%,d] unused segments in [%d] batches.",
-            getId(), getDataSource(), getInterval(), allUnusedSegments.size(), 
unusedSegmentBatches.size());
+      nextBatchSize = computeNextBatchSize(numSegmentsKilled);
+    } while (unusedSegments.size() != 0 && (null == numTotalBatches || 
numBatchesProcessed < numTotalBatches));
+
+    LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. 
Deleted total [%d] unused segments "
+             + "in [%d] batches.",
+        getId(),
+        getDataSource(),
+        getInterval(),
+        numSegmentsKilled,
+        numBatchesProcessed
+    );
 
     return TaskStatus.success(getId());
   }
 
+  @JsonIgnore
+  @VisibleForTesting
+  @Nullable
+  Integer getNumTotalBatches()
+  {
+    return null != limit ? (int) Math.ceil((double) limit / batchSize) : null;
+  }
+
+  @JsonIgnore
+  @VisibleForTesting
+  int computeNextBatchSize(int numSegmentsKilled)
+  {
+    return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) : 
batchSize;

Review Comment:
   I guess we need to add a guard here and/or test to ensure it's always >= 1.
   
   Paper scenario: `limit = 100`, `batchSize = 999999`. On first iteration 
`numSegmentsKilled` will be set to 100. Then, `computeNextBatchSize(100)` will 
return `0`, and `setMaxRows(0)` means `unlimited`. I think this is unintended.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -186,19 +212,48 @@ public TaskStatus runTask(TaskToolbox toolbox) throws 
Exception
       toolbox.getTaskActionClient().submit(new SegmentNukeAction(new 
HashSet<>(unusedSegments)));
       toolbox.getDataSegmentKiller().kill(unusedSegments);
       numBatchesProcessed++;
+      numSegmentsKilled += unusedSegments.size();
 
       if (numBatchesProcessed % 10 == 0) {
-        LOG.info("Processed [%d/%d] batches for kill task[%s].",
-                numBatchesProcessed, unusedSegmentBatches.size(), getId());
+        if (null != numTotalBatches) {
+          LOG.info("Processed [%d/%d] batches for kill task[%s].",
+              numBatchesProcessed, numTotalBatches, getId()
+          );
+        } else {

Review Comment:
   I think we can remove this `if()` branch by creating a separate log entry 
above, something like `Starting kill with batchSize[%d], up to limit[%d] 
segments will be deleted ([%d] batches])`, which simplifies the inner loop. 



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -190,12 +190,22 @@ public List<Pair<DataSegment, String>> 
retrieveUsedSegmentsAndCreatedDates(Strin
 
   @Override
   public List<DataSegment> retrieveUnusedSegmentsForInterval(final String 
dataSource, final Interval interval)
+  {
+    return retrieveUnusedSegmentsForInterval(dataSource, interval, null);

Review Comment:
   ... like this one does.



##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -123,17 +123,30 @@ Collection<DataSegment> retrieveUsedSegmentsForIntervals(
       Segments visibility
   );
 
+  /**
+   * see {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer)}
+   */
+  List<DataSegment> retrieveUnusedSegmentsForInterval(String dataSource, 
Interval interval);
+
   /**
    * Retrieve all published segments which include ONLY data within the given 
interval and are marked as unused from the
    * metadata store.
    *
-   * @param dataSource The data source the segments belong to
-   * @param interval   Filter the data segments to ones that include data in 
this interval exclusively.
+   * @param dataSource  The data source the segments belong to
+   * @param interval    Filter the data segments to ones that include data in 
this interval exclusively.
+   * @param maxSegments The maximum number of unused segments to retreive. If 
null, no limit is applied.
    *
    * @return DataSegments which include ONLY data within the requested 
interval and are marked as unused. Segments NOT
    * returned here may include data in the interval
    */
-  List<DataSegment> retrieveUnusedSegmentsForInterval(String dataSource, 
Interval interval);
+  default List<DataSegment> retrieveUnusedSegmentsForInterval(
+      String dataSource,
+      Interval interval,
+      @Nullable Integer maxSegments

Review Comment:
   This behaviour is a little confusing to me, since the `default` 
implementation does not fulfill the contract. Perhaps instead it should be 
reversed: a default for `retrieveUnusedSegmentsForInterval(String, Interval)`, 
can be provided, that calls to `retrieveUnusedSegmentsForInterval(String, 
Interval, null);`



##########
server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java:
##########
@@ -40,21 +42,27 @@ public class ClientKillUnusedSegmentsTaskQuery implements 
ClientTaskQuery
   private final Interval interval;
   private final Boolean markAsUnused;
   private final Integer batchSize;
+  @Nullable private final Integer limit;
 
   @JsonCreator
   public ClientKillUnusedSegmentsTaskQuery(
       @JsonProperty("id") String id,
       @JsonProperty("dataSource") String dataSource,
       @JsonProperty("interval") Interval interval,
-      @JsonProperty("markAsUnused") Boolean markAsUnused,
-      @JsonProperty("batchSize") Integer batchSize
+      @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
+      @JsonProperty("batchSize") Integer batchSize,
+      @JsonProperty("limit") Integer limit
   )
   {
     this.id = Preconditions.checkNotNull(id, "id");
     this.dataSource = dataSource;
     this.interval = interval;
     this.markAsUnused = markAsUnused;
     this.batchSize = batchSize;
+    if (null != limit) {
+      Preconditions.checkArgument(limit > 0, "limit must be > 0");

Review Comment:
   nit: Could probably be, `checkArgument(limit == null || limit > 0, ...`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to