kfaraz commented on code in PR #15710:
URL: https://github.com/apache/druid/pull/15710#discussion_r1456767074


##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -64,16 +71,16 @@ public class KillUnusedSegments implements CoordinatorDuty
   private static final Logger log = new Logger(KillUnusedSegments.class);
 
   private final long period;

Review Comment:
   Nit: Since we are touching this code anyway, maybe make these fields 
(`period` and `durationToRetain`) `Duration` instead of `long`. That would make 
some of the logs more readable. 



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -159,10 +176,9 @@ DruidCoordinatorRuntimeParams 
runInternal(DruidCoordinatorRuntimeParams params)
         dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
       }
 
-      log.debug("Killing unused segments in datasources: %s", 
dataSourcesToKill);
-      lastKillTime = System.currentTimeMillis();
+      log.debug("Killing unused segments for datasources[%s]", 
dataSourcesToKill);
+      lastKillTime = DateTimes.nowUtc();

Review Comment:
   Nit: Wondering if we should update `lastKillTime` after the kill task has 
been successfully submitted?
   
   It would have the benefit of not updating `lastKillTime` if there was an 
error in submitting the kill task. Thus, the duty would retry submit in the 
next coordinator run.
   Drawback is that if there is some bug and submitting a kill task failed 
once, this duty would keep trying to submit a kill task in every coordinator 
run and failing, flooding the log with errors. I guess that's okay?



##########
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java:
##########
@@ -158,6 +159,14 @@ public void setup()
         overlordClient,
         config
     );
+    while (!target.canDutyRun()) {

Review Comment:
   This doesn't seem clean. We shouldn't have to sleep in a loop in a test esp. 
when we are not actually verifying anything in or after the loop.
   
   Instead, we should try to use config values in the test that ensure that the 
duty is ready to run right after it has been created.



##########
indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java:
##########
@@ -132,6 +133,27 @@ public List<DataSegment> retrieveUnusedSegmentsForInterval(
     }
   }
 
+  @Override
+  public List<DataSegment> retrieveUnusedSegmentsForInterval(
+      String dataSource,
+      Interval interval,
+      @Nullable Integer limit,
+      @Nullable DateTime maxUsedStatusLastUpdatedTime
+  )
+  {
+    synchronized (unusedSegments) {
+      Stream<DataSegment> resultStream = unusedSegments.stream();
+
+      resultStream = resultStream.filter(ds -> !nuked.contains(ds));

Review Comment:
   Maybe chain here?



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -85,46 +92,56 @@ public KillUnusedSegments(
       DruidCoordinatorConfig config
   )
   {
+    if (config.getCoordinatorKillPeriod().getMillis() < 
config.getCoordinatorIndexingPeriod().getMillis()) {
+      throw InvalidInput.exception(
+          "druid.coordinator.kill.period[%s] must be >= 
druid.coordinator.period.indexingPeriod[%s]",
+          config.getCoordinatorKillPeriod(),
+          config.getCoordinatorIndexingPeriod()
+      );
+    }
+    if (config.getCoordinatorKillMaxSegments() < 0) {
+      throw InvalidInput.exception(
+          "druid.coordinator.kill.maxSegments[%s] is invalid. It must be a 
positive integer.",
+          config.getCoordinatorKillMaxSegments()
+      );
+    }
     this.period = config.getCoordinatorKillPeriod().getMillis();
-    Preconditions.checkArgument(
-        this.period >= config.getCoordinatorIndexingPeriod().getMillis(),
-        "coordinator kill period must be greater than or equal to 
druid.coordinator.period.indexingPeriod"
-    );
-
-    this.ignoreRetainDuration = 
config.getCoordinatorKillIgnoreDurationToRetain();
-    this.retainDuration = 
config.getCoordinatorKillDurationToRetain().getMillis();
-    if (this.ignoreRetainDuration) {
+    this.ignoreDurationToRetain = 
config.getCoordinatorKillIgnoreDurationToRetain();
+    this.durationToRetain = 
config.getCoordinatorKillDurationToRetain().getMillis();
+    if (this.ignoreDurationToRetain) {
       log.debug(
-          "druid.coordinator.kill.durationToRetain [%s] will be ignored when 
discovering segments to kill "
-          + "because you have set 
druid.coordinator.kill.ignoreDurationToRetain to True.",
-          this.retainDuration
+          "druid.coordinator.kill.durationToRetain[%s] will be ignored when 
discovering segments to kill "
+          + "because druid.coordinator.kill.ignoreDurationToRetain is set to 
true.",
+          this.durationToRetain
       );
     }
     this.bufferPeriod = config.getCoordinatorKillBufferPeriod().getMillis();
 
     this.maxSegmentsToKill = config.getCoordinatorKillMaxSegments();
-    Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill 
maxSegments must be > 0");
-
     datasourceToLastKillIntervalEnd = new ConcurrentHashMap<>();
 
     log.info(
-        "Kill Task scheduling enabled with period [%s], retainDuration [%s], 
bufferPeriod [%s], maxSegmentsToKill [%s]",
+        "Kill task scheduling enabled with period[%s], durationToRetain[%s], 
bufferPeriod[%s], maxSegmentsToKill[%s]",
         this.period,
-        this.ignoreRetainDuration ? "IGNORING" : this.retainDuration,
+        this.ignoreDurationToRetain ? "IGNORING" : this.durationToRetain,
         this.bufferPeriod,
         this.maxSegmentsToKill
     );
 
     this.segmentsMetadataManager = segmentsMetadataManager;
     this.overlordClient = overlordClient;
+    this.lastKillTime = DateTimes.nowUtc();
   }
 
   @Override
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
   {
-    final long currentTimeMillis = System.currentTimeMillis();
-    if (lastKillTime + period > currentTimeMillis) {
-      log.debug("Skipping kill of unused segments as kill period has not 
elapsed yet.");
+    if (!canDutyRun()) {
+      log.debug(
+          "Skipping KillUnusedSegments until period[%s] have elapsed after 
lastKillTime[%s].",

Review Comment:
   ```suggestion
             "Skipping KillUnusedSegments until period[%s] has elapsed after 
lastKillTime[%s].",
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -270,6 +296,12 @@ private int getAvailableKillTaskSlots(int 
killTaskCapacity, int numActiveKillTas
     );
   }
 
+  @VisibleForTesting

Review Comment:
   Nit:
   Can we do something other than making this method visible?
   We could just test the `run` method itself.
   
   Druid code is already flooded with this annotation and we seem to keep 
adding more. It really defeats the point of having public and private methods. 
In some cases, where the code is really involved, it probably helps to test out 
individual methods. But this one seems to be a simple case that can be tested 
in other ways.



##########
docs/data-management/delete.md:
##########
@@ -106,7 +107,8 @@ Some of the parameters used in the task payload are further 
explained below:
 | Parameter   | Default         | Explanation                                  
                                                                                
                                                                                
                                                                                
                                                                               |
 
|-------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | `batchSize`    |100    | Maximum number of segments that are deleted in one 
kill batch. Some operations on the Overlord may get stuck while a `kill` task 
is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus, 
a `kill` task splits the list of unused segments to be deleted into smaller 
batches to yield the Overlord resources intermittently to other task 
operations.|
-| `limit`     | null - no limit | Maximum number of segments for the kill task 
to delete.|
+| `limit`     | null (no limit) | Maximum number of segments for the kill task 
to delete.|
+| `maxUsedStatusLastUpdatedTime` | null (no cutoff) | Represents the maximum 
timestamp used as a cutoff to include unused segments. The kill task considers 
segments in the specified `interval` marked as unused no later than this time. 
The default behavior is to kill all unused segments in the `interval` 
regardless of the time when segments where marked as unused.|

Review Comment:
   ```suggestion
   | `maxUsedStatusLastUpdatedTime` | null (no cutoff) | Maximum timestamp used 
as a cutoff to include unused segments. The kill task only considers segments 
which lie in the specified `interval` and were marked as unused no later than 
this time. The default behavior is to kill all unused segments in the 
`interval` regardless of when they where marked as unused.|
   ```



##########
server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java:
##########
@@ -72,21 +73,28 @@ public interface OverlordClient
   ListenableFuture<Void> runTask(String taskId, Object taskObject);
 
   /**
-   * Run a "kill" task for a particular datasource and interval. Shortcut to 
{@link #runTask(String, Object)}.
-   *
-   * The kill task deletes all unused segment records from deep storage and 
the metadata store. The task runs
-   * asynchronously after the API call returns. The resolved future is the ID 
of the task, which can be used to
-   * monitor its progress through the {@link #taskStatus(String)} API.
-   *
-   * @param idPrefix   Descriptive prefix to include at the start of task IDs
-   * @param dataSource Datasource to kill
-   * @param interval   Interval to kill
-   *
-   * @return future with task ID
+   * Shortcut to {@link #runKillTask(String, String, Interval, Integer, 
DateTime)}.
+   * Note that this method is deprecated and newer implementations must use 
{@link #runKillTask(String, String, Interval, Integer, DateTime)}.
    */
+  @Deprecated

Review Comment:
   Why deprecate it? This is not a public API, you can just get rid of the 
older methods. (unless there are rolling upgrade concerns, which doesn't seem 
to be the case since an Overlord on an old version would just get a null value 
of `maxUsedLastUpdated`)



##########
docs/operations/clean-metadata-store.md:
##########
@@ -86,6 +86,8 @@ Only applies to the specified datasources in the dynamic 
configuration parameter
 If `killDataSourceWhitelist` is not set or empty, then kill tasks can be 
submitted for all datasources.
 - `druid.coordinator.kill.period`: Defines the frequency in [ISO 8601 
format](https://en.wikipedia.org/wiki/ISO_8601#Durations) for the cleanup job 
to check for and delete eligible segments. Defaults to `P1D`. Must be greater 
than `druid.coordinator.period.indexingPeriod`. 
 - `druid.coordinator.kill.durationToRetain`: Defines the retention period in 
[ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) after 
creation that segments become eligible for deletion.
+- `druid.coordinator.kill.ignoreDurationToRetain`: A way to override 
`druid.coordinator.kill.durationToRetain`. When enabled, the coordinator will 
consider all unused segments as eligible to be killed.

Review Comment:
   ```suggestion
   - `druid.coordinator.kill.ignoreDurationToRetain`: A way to override 
`druid.coordinator.kill.durationToRetain`. When enabled, the coordinator 
considers all unused segments as eligible to be killed.
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -85,46 +92,56 @@ public KillUnusedSegments(
       DruidCoordinatorConfig config
   )
   {
+    if (config.getCoordinatorKillPeriod().getMillis() < 
config.getCoordinatorIndexingPeriod().getMillis()) {
+      throw InvalidInput.exception(
+          "druid.coordinator.kill.period[%s] must be >= 
druid.coordinator.period.indexingPeriod[%s]",
+          config.getCoordinatorKillPeriod(),
+          config.getCoordinatorIndexingPeriod()
+      );
+    }
+    if (config.getCoordinatorKillMaxSegments() < 0) {
+      throw InvalidInput.exception(
+          "druid.coordinator.kill.maxSegments[%s] is invalid. It must be a 
positive integer.",
+          config.getCoordinatorKillMaxSegments()
+      );
+    }
     this.period = config.getCoordinatorKillPeriod().getMillis();
-    Preconditions.checkArgument(
-        this.period >= config.getCoordinatorIndexingPeriod().getMillis(),
-        "coordinator kill period must be greater than or equal to 
druid.coordinator.period.indexingPeriod"
-    );
-
-    this.ignoreRetainDuration = 
config.getCoordinatorKillIgnoreDurationToRetain();
-    this.retainDuration = 
config.getCoordinatorKillDurationToRetain().getMillis();
-    if (this.ignoreRetainDuration) {
+    this.ignoreDurationToRetain = 
config.getCoordinatorKillIgnoreDurationToRetain();
+    this.durationToRetain = 
config.getCoordinatorKillDurationToRetain().getMillis();
+    if (this.ignoreDurationToRetain) {
       log.debug(
-          "druid.coordinator.kill.durationToRetain [%s] will be ignored when 
discovering segments to kill "
-          + "because you have set 
druid.coordinator.kill.ignoreDurationToRetain to True.",
-          this.retainDuration
+          "druid.coordinator.kill.durationToRetain[%s] will be ignored when 
discovering segments to kill "
+          + "because druid.coordinator.kill.ignoreDurationToRetain is set to 
true.",
+          this.durationToRetain
       );
     }
     this.bufferPeriod = config.getCoordinatorKillBufferPeriod().getMillis();
 
     this.maxSegmentsToKill = config.getCoordinatorKillMaxSegments();
-    Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill 
maxSegments must be > 0");
-
     datasourceToLastKillIntervalEnd = new ConcurrentHashMap<>();
 
     log.info(
-        "Kill Task scheduling enabled with period [%s], retainDuration [%s], 
bufferPeriod [%s], maxSegmentsToKill [%s]",
+        "Kill task scheduling enabled with period[%s], durationToRetain[%s], 
bufferPeriod[%s], maxSegmentsToKill[%s]",
         this.period,
-        this.ignoreRetainDuration ? "IGNORING" : this.retainDuration,
+        this.ignoreDurationToRetain ? "IGNORING" : this.durationToRetain,
         this.bufferPeriod,
         this.maxSegmentsToKill
     );
 
     this.segmentsMetadataManager = segmentsMetadataManager;
     this.overlordClient = overlordClient;
+    this.lastKillTime = DateTimes.nowUtc();

Review Comment:
   In the original code, `lastKillTime` was 0. Setting this to `now` would mean 
that the first kill task would come after the coordinator has already run for 1 
kill period. Whereas in the original code, it would have come right after the 
coordinator started.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java:
##########
@@ -42,16 +43,21 @@ public class RetrieveUnusedSegmentsAction implements 
TaskAction<List<DataSegment
   @JsonIgnore
   private final Integer limit;
 
+  @JsonIgnore

Review Comment:
   Unrelated to this PR:
   This is a weird style that seems to be used in some task actions. Not sure 
why we even need `@JsonIgnore` here when we are already exposing a getter 
annotated with `@JsonProperty`. Other task actions, like 
`SegmentAllocateAction` don't do this.



##########
docs/operations/clean-metadata-store.md:
##########
@@ -86,6 +86,8 @@ Only applies to the specified datasources in the dynamic 
configuration parameter
 If `killDataSourceWhitelist` is not set or empty, then kill tasks can be 
submitted for all datasources.
 - `druid.coordinator.kill.period`: Defines the frequency in [ISO 8601 
format](https://en.wikipedia.org/wiki/ISO_8601#Durations) for the cleanup job 
to check for and delete eligible segments. Defaults to `P1D`. Must be greater 
than `druid.coordinator.period.indexingPeriod`. 
 - `druid.coordinator.kill.durationToRetain`: Defines the retention period in 
[ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) after 
creation that segments become eligible for deletion.
+- `druid.coordinator.kill.ignoreDurationToRetain`: A way to override 
`druid.coordinator.kill.durationToRetain`. When enabled, the coordinator will 
consider all unused segments as eligible to be killed.
+- `druid.coordinator.kill.bufferPeriod`: Defines the amount of time that a 
segment must be unused before it is able to be permanently removed from 
metadata and deep storage. This serves as a buffer period to prevent data loss 
if data ends up being needed after being marked unused.

Review Comment:
   Thanks for adding these in the docs!
   
   ```suggestion
   - `druid.coordinator.kill.bufferPeriod`: Defines the amount of time that a 
segment must be unused before it can be permanently removed from metadata and 
deep storage. This serves as a buffer period to prevent data loss if data ends 
up being needed after being marked unused.
   ```



##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -127,11 +128,23 @@ Collection<DataSegment> retrieveUsedSegmentsForIntervals(
   );
 
   /**
-   * see {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer)}
+   * See {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer, 
DateTime)}.
    */
   default List<DataSegment> retrieveUnusedSegmentsForInterval(String 
dataSource, Interval interval)
   {
-    return retrieveUnusedSegmentsForInterval(dataSource, interval, null);
+    return retrieveUnusedSegmentsForInterval(dataSource, interval, null, null);
+  }
+
+  /**
+   * See {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer, 
DateTime)}.
+   */
+  default List<DataSegment> retrieveUnusedSegmentsForInterval(

Review Comment:
   We should maybe get rid of some of these methods as they seem to be used 
only in tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to