jasonk000 commented on code in PR #14662:
URL: https://github.com/apache/druid/pull/14662#discussion_r1282525925
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -186,19 +212,48 @@ public TaskStatus runTask(TaskToolbox toolbox) throws
Exception
toolbox.getTaskActionClient().submit(new SegmentNukeAction(new
HashSet<>(unusedSegments)));
toolbox.getDataSegmentKiller().kill(unusedSegments);
numBatchesProcessed++;
+ numSegmentsKilled += unusedSegments.size();
if (numBatchesProcessed % 10 == 0) {
- LOG.info("Processed [%d/%d] batches for kill task[%s].",
- numBatchesProcessed, unusedSegmentBatches.size(), getId());
+ if (null != numTotalBatches) {
+ LOG.info("Processed [%d/%d] batches for kill task[%s].",
+ numBatchesProcessed, numTotalBatches, getId()
+ );
+ } else {
+ LOG.info("Processed [%d] batches for kill task[%s].",
numBatchesProcessed, getId());
+ }
}
- }
- LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s].
Deleted total [%,d] unused segments in [%d] batches.",
- getId(), getDataSource(), getInterval(), allUnusedSegments.size(),
unusedSegmentBatches.size());
+ nextBatchSize = computeNextBatchSize(numSegmentsKilled);
+ } while (unusedSegments.size() != 0 && (null == numTotalBatches ||
numBatchesProcessed < numTotalBatches));
+
+ LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s].
Deleted total [%d] unused segments "
+ + "in [%d] batches.",
+ getId(),
+ getDataSource(),
+ getInterval(),
+ numSegmentsKilled,
+ numBatchesProcessed
+ );
return TaskStatus.success(getId());
}
+ @JsonIgnore
+ @VisibleForTesting
+ @Nullable
+ Integer getNumTotalBatches()
+ {
+ return null != limit ? (int) Math.ceil((double) limit / batchSize) : null;
+ }
+
+ @JsonIgnore
+ @VisibleForTesting
+ int computeNextBatchSize(int numSegmentsKilled)
+ {
+ return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) :
batchSize;
Review Comment:
I guess we need to add a guard here and/or test to ensure it's always >= 1.
Paper scenario: `limit = 100`, `batchSize = 999999`. On first iteration
`numSegmentsKilled` will be set to 100. Then, `computeNextBatchSize(100)` will
return `0`, and `setMaxRows(0)` means `unlimited`. I think this is unintended.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -186,19 +212,48 @@ public TaskStatus runTask(TaskToolbox toolbox) throws
Exception
toolbox.getTaskActionClient().submit(new SegmentNukeAction(new
HashSet<>(unusedSegments)));
toolbox.getDataSegmentKiller().kill(unusedSegments);
numBatchesProcessed++;
+ numSegmentsKilled += unusedSegments.size();
if (numBatchesProcessed % 10 == 0) {
- LOG.info("Processed [%d/%d] batches for kill task[%s].",
- numBatchesProcessed, unusedSegmentBatches.size(), getId());
+ if (null != numTotalBatches) {
+ LOG.info("Processed [%d/%d] batches for kill task[%s].",
+ numBatchesProcessed, numTotalBatches, getId()
+ );
+ } else {
Review Comment:
I think we can remove this `if()` branch by creating a separate log entry
above, something like `Starting kill with batchSize[%d], up to limit[%d]
segments will be deleted ([%d] batches])`, which simplifies the inner loop.
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -190,12 +190,22 @@ public List<Pair<DataSegment, String>>
retrieveUsedSegmentsAndCreatedDates(Strin
@Override
public List<DataSegment> retrieveUnusedSegmentsForInterval(final String
dataSource, final Interval interval)
+ {
+ return retrieveUnusedSegmentsForInterval(dataSource, interval, null);
Review Comment:
... like this one does.
##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -123,17 +123,30 @@ Collection<DataSegment> retrieveUsedSegmentsForIntervals(
Segments visibility
);
+ /**
+ * see {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer)}
+ */
+ List<DataSegment> retrieveUnusedSegmentsForInterval(String dataSource,
Interval interval);
+
/**
* Retrieve all published segments which include ONLY data within the given
interval and are marked as unused from the
* metadata store.
*
- * @param dataSource The data source the segments belong to
- * @param interval Filter the data segments to ones that include data in
this interval exclusively.
+ * @param dataSource The data source the segments belong to
+ * @param interval Filter the data segments to ones that include data in
this interval exclusively.
+ * @param maxSegments The maximum number of unused segments to retreive. If
null, no limit is applied.
*
* @return DataSegments which include ONLY data within the requested
interval and are marked as unused. Segments NOT
* returned here may include data in the interval
*/
- List<DataSegment> retrieveUnusedSegmentsForInterval(String dataSource,
Interval interval);
+ default List<DataSegment> retrieveUnusedSegmentsForInterval(
+ String dataSource,
+ Interval interval,
+ @Nullable Integer maxSegments
Review Comment:
This behaviour is a little confusing to me, since the `default`
implementation does not fulfill the contract. Perhaps instead it should be
reversed: a default for `retrieveUnusedSegmentsForInterval(String, Interval)`,
can be provided, that calls to `retrieveUnusedSegmentsForInterval(String,
Interval, null);`
##########
server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java:
##########
@@ -40,21 +42,27 @@ public class ClientKillUnusedSegmentsTaskQuery implements
ClientTaskQuery
private final Interval interval;
private final Boolean markAsUnused;
private final Integer batchSize;
+ @Nullable private final Integer limit;
@JsonCreator
public ClientKillUnusedSegmentsTaskQuery(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
- @JsonProperty("markAsUnused") Boolean markAsUnused,
- @JsonProperty("batchSize") Integer batchSize
+ @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
+ @JsonProperty("batchSize") Integer batchSize,
+ @JsonProperty("limit") Integer limit
)
{
this.id = Preconditions.checkNotNull(id, "id");
this.dataSource = dataSource;
this.interval = interval;
this.markAsUnused = markAsUnused;
this.batchSize = batchSize;
+ if (null != limit) {
+ Preconditions.checkArgument(limit > 0, "limit must be > 0");
Review Comment:
nit: Could probably be, `checkArgument(limit == null || limit > 0, ...`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]