kfaraz commented on code in PR #14642:
URL: https://github.com/apache/druid/pull/14642#discussion_r1278305137
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -60,6 +61,10 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
{
private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);
+ // We split this to try and keep each nuke operation relatively short, in
the case that either
+ // the database or the storage layer is particularly slow.
+ private static final int SEGMENT_NUKE_BATCH_SIZE = 10_000;
Review Comment:
Cool, breaking into batches can definitely not hurt as we are already taking
care of not deleting S3 files of undeleted metadata segments. If this helps
with making the `TaskLockbox` more granular, we should definitely do this.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -114,23 +141,38 @@ public TaskStatus runTask(TaskToolbox toolbox) throws
Exception
}
// List unused segments
- final List<DataSegment> unusedSegments = toolbox
+ final List<DataSegment> allUnusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(),
getInterval()));
- if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
- throw new ISE(
- "Locks[%s] for task[%s] can't cover segments[%s]",
-
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
- getId(),
- unusedSegments
- );
+ final List<List<DataSegment>> unusedSegmentBatches =
Lists.partition(allUnusedSegments, batchSize);
+
Review Comment:
Please add a log here for total number of batches created and `batchSize`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -60,15 +63,23 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
{
private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);
+ // We split this to try and keep each nuke operation relatively short, in
the case that either
Review Comment:
This comment should be moved into a javadoc for the field `batchSize`.
For this constant, add a different comment (preferable javadoc style)
instead which sheds some light on why this exact value was chosen.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -60,15 +63,23 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
{
private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);
+ // We split this to try and keep each nuke operation relatively short, in
the case that either
+ // the database or the storage layer is particularly slow.
+ private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100;
+
private final boolean markAsUnused;
+ private final int batchSize;
Review Comment:
Javadoc for this parameter would be nice.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -114,23 +141,38 @@ public TaskStatus runTask(TaskToolbox toolbox) throws
Exception
}
// List unused segments
- final List<DataSegment> unusedSegments = toolbox
+ final List<DataSegment> allUnusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(),
getInterval()));
- if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
- throw new ISE(
- "Locks[%s] for task[%s] can't cover segments[%s]",
-
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
- getId(),
- unusedSegments
- );
+ final List<List<DataSegment>> unusedSegmentBatches =
Lists.partition(allUnusedSegments, batchSize);
+
+ // The individual activities here on the toolbox have possibility to run
for a longer period of time,
+ // since they involve calls to metadata storage and archival object
storage. And, the tasks take hold of the
+ // task lockbox to run. By splitting the segment list into smaller
batches, we have an opportunity to yield the
+ // lock to other activity that might need to happen using the overlord
tasklockbox.
+
+ for (final List<DataSegment> unusedSegments : unusedSegmentBatches) {
+ if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
+ throw new ISE(
+ "Locks[%s] for task[%s] can't cover segments[%s]",
+
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
+ getId(),
+ unusedSegments
+ );
+ }
+
+ // Kill segments:
+ // Order is important here: we want the nuke action to clean up the
metadata records _before_ the
+ // segments are removed from storage, this helps maintain that we will
always have a storage segment if
+ // the metadata segment is present. If the segment nuke throws an
exception, then the segment cleanup is
+ // abandoned.
+
+ toolbox.getTaskActionClient().submit(new SegmentNukeAction(new
HashSet<>(unusedSegments)));
+ toolbox.getDataSegmentKiller().kill(unusedSegments);
+ countBatchesIssued++;
Review Comment:
Yeah, logging some progress would be nice.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -60,15 +63,23 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
{
private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);
+ // We split this to try and keep each nuke operation relatively short, in
the case that either
+ // the database or the storage layer is particularly slow.
+ private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100;
+
private final boolean markAsUnused;
+ private final int batchSize;
+
+ private long countBatchesIssued = 0;
Review Comment:
```suggestion
private long numBatchesProcessed;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]