capistrant commented on code in PR #18028:
URL: https://github.com/apache/druid/pull/18028#discussion_r2121667766
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java:
##########
@@ -219,15 +222,18 @@ private boolean shouldResetKillQueue()
}
/**
- * Resets the kill queue with fresh jobs.
+ * Clears the kill queue and adds fresh jobs.
* This method need not handle race conditions as it is always run on
* {@link #exec} which is single-threaded.
*/
- private void resetKillQueue()
+ private void rebuildKillQueue()
{
final Stopwatch resetDuration = Stopwatch.createStarted();
try {
killQueue.clear();
+ if (!leaderSelector.isLeader()) {
Review Comment:
did you mean to add an early return in this new conditional block to prevent
queue re-build?
##########
server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java:
##########
@@ -39,14 +40,24 @@ public class SegmentsMetadataManagerConfig
@JsonProperty
private final SegmentMetadataCache.UsageMode useIncrementalCache;
+ @JsonProperty
+ private final UnusedSegmentKillerConfig killUnused;
+
@JsonCreator
public SegmentsMetadataManagerConfig(
@JsonProperty("pollDuration") Period pollDuration,
- @JsonProperty("useIncrementalCache") SegmentMetadataCache.UsageMode
useIncrementalCache
+ @JsonProperty("useIncrementalCache") SegmentMetadataCache.UsageMode
useIncrementalCache,
+ @JsonProperty("killUnused") UnusedSegmentKillerConfig killUnused
)
{
this.pollDuration = Configs.valueOrDefault(pollDuration,
Period.minutes(1));
this.useIncrementalCache = Configs.valueOrDefault(useIncrementalCache,
SegmentMetadataCache.UsageMode.NEVER);
+ this.killUnused = Configs.valueOrDefault(killUnused, new
UnusedSegmentKillerConfig(null, null));
+ if (this.killUnused.isEnabled() && this.useIncrementalCache ==
SegmentMetadataCache.UsageMode.NEVER) {
Review Comment:
I don't love the idea of rejecting tasks based on prefix. I think that since
it is not a risk to the correctness/health of the underlying cluster, we should
leave it be with good documentation that we suggest updating coordinator
configs if using the new embedded kill.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java:
##########
@@ -73,26 +73,26 @@ public Void perform(Task task, TaskActionToolbox toolbox)
try {
final Set<Interval> intervals =
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet());
- toolbox.getTaskLockbox().doInCriticalSection(
+ int numDeletedSegments = toolbox.getTaskLockbox().doInCriticalSection(
task,
intervals,
- CriticalAction.builder().onValidLocks(
- () -> {
- int numDeletedSegments =
-
toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments);
- log.info(
- "Deleted [%d] segments out of requested[%d] from"
- + " metadata store for task[%s], datasource[%s],
intervals[%s].",
- numDeletedSegments, segments.size(), task.getId(),
task.getDataSource(), intervals
- );
- return numDeletedSegments;
- }
+ CriticalAction.<Integer>builder().onValidLocks(
+ () ->
toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments)
).onInvalidLocks(
() -> {
throw new ISE("Some locks for task[%s] are already revoked",
task.getId());
}
).build()
);
+
+ final Set<Interval> sampleIntervals =
intervals.stream().limit(5).collect(Collectors.toSet());
Review Comment:
Will this change to using a sample of 5 have to potentially change based on
what we decide about Gian's note about all deletes being auditable in logs?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]