kfaraz commented on code in PR #15710:
URL: https://github.com/apache/druid/pull/15710#discussion_r1456767074
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -64,16 +71,16 @@ public class KillUnusedSegments implements CoordinatorDuty
private static final Logger log = new Logger(KillUnusedSegments.class);
private final long period;
Review Comment:
Nit: Since we are touching this code anyway, maybe make these fields
(`period` and `durationToRetain`) `Duration` instead of `long`. That would make
some of the logs more readable.
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -159,10 +176,9 @@ DruidCoordinatorRuntimeParams
runInternal(DruidCoordinatorRuntimeParams params)
dataSourcesToKill =
segmentsMetadataManager.retrieveAllDataSourceNames();
}
- log.debug("Killing unused segments in datasources: %s",
dataSourcesToKill);
- lastKillTime = System.currentTimeMillis();
+ log.debug("Killing unused segments for datasources[%s]",
dataSourcesToKill);
+ lastKillTime = DateTimes.nowUtc();
Review Comment:
Nit: Wondering if we should update `lastKillTime` after the kill task has
been successfully submitted?
It would have the benefit of not updating `lastKillTime` if there was an
error in submitting the kill task. Thus, the duty would retry submit in the
next coordinator run.
Drawback is that if there is some bug and submitting a kill task failed
once, this duty would keep trying to submit a kill task in every coordinator
run and failing, flooding the log with errors. I guess that's okay?
##########
server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java:
##########
@@ -158,6 +159,14 @@ public void setup()
overlordClient,
config
);
+ while (!target.canDutyRun()) {
Review Comment:
This doesn't seem clean. We shouldn't have to sleep in a loop in a test esp.
when we are not actually verifying anything in or after the loop.
Instead, we should try to use config values in the test that ensure that the
duty is ready to run right after it has been created.
##########
indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java:
##########
@@ -132,6 +133,27 @@ public List<DataSegment> retrieveUnusedSegmentsForInterval(
}
}
+ @Override
+ public List<DataSegment> retrieveUnusedSegmentsForInterval(
+ String dataSource,
+ Interval interval,
+ @Nullable Integer limit,
+ @Nullable DateTime maxUsedStatusLastUpdatedTime
+ )
+ {
+ synchronized (unusedSegments) {
+ Stream<DataSegment> resultStream = unusedSegments.stream();
+
+ resultStream = resultStream.filter(ds -> !nuked.contains(ds));
Review Comment:
Maybe chain here?
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -85,46 +92,56 @@ public KillUnusedSegments(
DruidCoordinatorConfig config
)
{
+ if (config.getCoordinatorKillPeriod().getMillis() <
config.getCoordinatorIndexingPeriod().getMillis()) {
+ throw InvalidInput.exception(
+ "druid.coordinator.kill.period[%s] must be >=
druid.coordinator.period.indexingPeriod[%s]",
+ config.getCoordinatorKillPeriod(),
+ config.getCoordinatorIndexingPeriod()
+ );
+ }
+ if (config.getCoordinatorKillMaxSegments() < 0) {
+ throw InvalidInput.exception(
+ "druid.coordinator.kill.maxSegments[%s] is invalid. It must be a
positive integer.",
+ config.getCoordinatorKillMaxSegments()
+ );
+ }
this.period = config.getCoordinatorKillPeriod().getMillis();
- Preconditions.checkArgument(
- this.period >= config.getCoordinatorIndexingPeriod().getMillis(),
- "coordinator kill period must be greater than or equal to
druid.coordinator.period.indexingPeriod"
- );
-
- this.ignoreRetainDuration =
config.getCoordinatorKillIgnoreDurationToRetain();
- this.retainDuration =
config.getCoordinatorKillDurationToRetain().getMillis();
- if (this.ignoreRetainDuration) {
+ this.ignoreDurationToRetain =
config.getCoordinatorKillIgnoreDurationToRetain();
+ this.durationToRetain =
config.getCoordinatorKillDurationToRetain().getMillis();
+ if (this.ignoreDurationToRetain) {
log.debug(
- "druid.coordinator.kill.durationToRetain [%s] will be ignored when
discovering segments to kill "
- + "because you have set
druid.coordinator.kill.ignoreDurationToRetain to True.",
- this.retainDuration
+ "druid.coordinator.kill.durationToRetain[%s] will be ignored when
discovering segments to kill "
+ + "because druid.coordinator.kill.ignoreDurationToRetain is set to
true.",
+ this.durationToRetain
);
}
this.bufferPeriod = config.getCoordinatorKillBufferPeriod().getMillis();
this.maxSegmentsToKill = config.getCoordinatorKillMaxSegments();
- Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill
maxSegments must be > 0");
-
datasourceToLastKillIntervalEnd = new ConcurrentHashMap<>();
log.info(
- "Kill Task scheduling enabled with period [%s], retainDuration [%s],
bufferPeriod [%s], maxSegmentsToKill [%s]",
+ "Kill task scheduling enabled with period[%s], durationToRetain[%s],
bufferPeriod[%s], maxSegmentsToKill[%s]",
this.period,
- this.ignoreRetainDuration ? "IGNORING" : this.retainDuration,
+ this.ignoreDurationToRetain ? "IGNORING" : this.durationToRetain,
this.bufferPeriod,
this.maxSegmentsToKill
);
this.segmentsMetadataManager = segmentsMetadataManager;
this.overlordClient = overlordClient;
+ this.lastKillTime = DateTimes.nowUtc();
}
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
{
- final long currentTimeMillis = System.currentTimeMillis();
- if (lastKillTime + period > currentTimeMillis) {
- log.debug("Skipping kill of unused segments as kill period has not
elapsed yet.");
+ if (!canDutyRun()) {
+ log.debug(
+ "Skipping KillUnusedSegments until period[%s] have elapsed after
lastKillTime[%s].",
Review Comment:
```suggestion
"Skipping KillUnusedSegments until period[%s] has elapsed after
lastKillTime[%s].",
```
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -270,6 +296,12 @@ private int getAvailableKillTaskSlots(int
killTaskCapacity, int numActiveKillTas
);
}
+ @VisibleForTesting
Review Comment:
Nit:
Can we do something other than making this method visible?
We could just test the `run` method itself.
Druid code is already flooded with this annotation and we seem to keep
adding more. It really defeats the point of having public and private methods.
In some cases, where the code is really involved, it probably helps to test out
individual methods. But this one seems to be a simple case that can be tested
in other ways.
##########
docs/data-management/delete.md:
##########
@@ -106,7 +107,8 @@ Some of the parameters used in the task payload are further
explained below:
| Parameter | Default | Explanation
|
|-------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `batchSize` |100 | Maximum number of segments that are deleted in one
kill batch. Some operations on the Overlord may get stuck while a `kill` task
is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus,
a `kill` task splits the list of unused segments to be deleted into smaller
batches to yield the Overlord resources intermittently to other task
operations.|
-| `limit` | null - no limit | Maximum number of segments for the kill task
to delete.|
+| `limit` | null (no limit) | Maximum number of segments for the kill task
to delete.|
+| `maxUsedStatusLastUpdatedTime` | null (no cutoff) | Represents the maximum
timestamp used as a cutoff to include unused segments. The kill task considers
segments in the specified `interval` marked as unused no later than this time.
The default behavior is to kill all unused segments in the `interval`
regardless of the time when segments where marked as unused.|
Review Comment:
```suggestion
| `maxUsedStatusLastUpdatedTime` | null (no cutoff) | Maximum timestamp used
as a cutoff to include unused segments. The kill task only considers segments
which lie in the specified `interval` and were marked as unused no later than
this time. The default behavior is to kill all unused segments in the
`interval` regardless of when they where marked as unused.|
```
##########
server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java:
##########
@@ -72,21 +73,28 @@ public interface OverlordClient
ListenableFuture<Void> runTask(String taskId, Object taskObject);
/**
- * Run a "kill" task for a particular datasource and interval. Shortcut to
{@link #runTask(String, Object)}.
- *
- * The kill task deletes all unused segment records from deep storage and
the metadata store. The task runs
- * asynchronously after the API call returns. The resolved future is the ID
of the task, which can be used to
- * monitor its progress through the {@link #taskStatus(String)} API.
- *
- * @param idPrefix Descriptive prefix to include at the start of task IDs
- * @param dataSource Datasource to kill
- * @param interval Interval to kill
- *
- * @return future with task ID
+ * Shortcut to {@link #runKillTask(String, String, Interval, Integer,
DateTime)}.
+ * Note that this method is deprecated and newer implementations must use
{@link #runKillTask(String, String, Interval, Integer, DateTime)}.
*/
+ @Deprecated
Review Comment:
Why deprecate it? This is not a public API, you can just get rid of the
older methods. (unless there are rolling upgrade concerns, which doesn't seem
to be the case since an Overlord on an old version would just get a null value
of `maxUsedLastUpdated`)
##########
docs/operations/clean-metadata-store.md:
##########
@@ -86,6 +86,8 @@ Only applies to the specified datasources in the dynamic
configuration parameter
If `killDataSourceWhitelist` is not set or empty, then kill tasks can be
submitted for all datasources.
- `druid.coordinator.kill.period`: Defines the frequency in [ISO 8601
format](https://en.wikipedia.org/wiki/ISO_8601#Durations) for the cleanup job
to check for and delete eligible segments. Defaults to `P1D`. Must be greater
than `druid.coordinator.period.indexingPeriod`.
- `druid.coordinator.kill.durationToRetain`: Defines the retention period in
[ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) after
creation that segments become eligible for deletion.
+- `druid.coordinator.kill.ignoreDurationToRetain`: A way to override
`druid.coordinator.kill.durationToRetain`. When enabled, the coordinator will
consider all unused segments as eligible to be killed.
Review Comment:
```suggestion
- `druid.coordinator.kill.ignoreDurationToRetain`: A way to override
`druid.coordinator.kill.durationToRetain`. When enabled, the coordinator
considers all unused segments as eligible to be killed.
```
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -85,46 +92,56 @@ public KillUnusedSegments(
DruidCoordinatorConfig config
)
{
+ if (config.getCoordinatorKillPeriod().getMillis() <
config.getCoordinatorIndexingPeriod().getMillis()) {
+ throw InvalidInput.exception(
+ "druid.coordinator.kill.period[%s] must be >=
druid.coordinator.period.indexingPeriod[%s]",
+ config.getCoordinatorKillPeriod(),
+ config.getCoordinatorIndexingPeriod()
+ );
+ }
+ if (config.getCoordinatorKillMaxSegments() < 0) {
+ throw InvalidInput.exception(
+ "druid.coordinator.kill.maxSegments[%s] is invalid. It must be a
positive integer.",
+ config.getCoordinatorKillMaxSegments()
+ );
+ }
this.period = config.getCoordinatorKillPeriod().getMillis();
- Preconditions.checkArgument(
- this.period >= config.getCoordinatorIndexingPeriod().getMillis(),
- "coordinator kill period must be greater than or equal to
druid.coordinator.period.indexingPeriod"
- );
-
- this.ignoreRetainDuration =
config.getCoordinatorKillIgnoreDurationToRetain();
- this.retainDuration =
config.getCoordinatorKillDurationToRetain().getMillis();
- if (this.ignoreRetainDuration) {
+ this.ignoreDurationToRetain =
config.getCoordinatorKillIgnoreDurationToRetain();
+ this.durationToRetain =
config.getCoordinatorKillDurationToRetain().getMillis();
+ if (this.ignoreDurationToRetain) {
log.debug(
- "druid.coordinator.kill.durationToRetain [%s] will be ignored when
discovering segments to kill "
- + "because you have set
druid.coordinator.kill.ignoreDurationToRetain to True.",
- this.retainDuration
+ "druid.coordinator.kill.durationToRetain[%s] will be ignored when
discovering segments to kill "
+ + "because druid.coordinator.kill.ignoreDurationToRetain is set to
true.",
+ this.durationToRetain
);
}
this.bufferPeriod = config.getCoordinatorKillBufferPeriod().getMillis();
this.maxSegmentsToKill = config.getCoordinatorKillMaxSegments();
- Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill
maxSegments must be > 0");
-
datasourceToLastKillIntervalEnd = new ConcurrentHashMap<>();
log.info(
- "Kill Task scheduling enabled with period [%s], retainDuration [%s],
bufferPeriod [%s], maxSegmentsToKill [%s]",
+ "Kill task scheduling enabled with period[%s], durationToRetain[%s],
bufferPeriod[%s], maxSegmentsToKill[%s]",
this.period,
- this.ignoreRetainDuration ? "IGNORING" : this.retainDuration,
+ this.ignoreDurationToRetain ? "IGNORING" : this.durationToRetain,
this.bufferPeriod,
this.maxSegmentsToKill
);
this.segmentsMetadataManager = segmentsMetadataManager;
this.overlordClient = overlordClient;
+ this.lastKillTime = DateTimes.nowUtc();
Review Comment:
In the original code, `lastKillTime` was 0. Setting this to `now` would mean
that the first kill task would come after the coordinator has already run for 1
kill period. Whereas in the original code, it would have come right after the
coordinator started.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java:
##########
@@ -42,16 +43,21 @@ public class RetrieveUnusedSegmentsAction implements
TaskAction<List<DataSegment
@JsonIgnore
private final Integer limit;
+ @JsonIgnore
Review Comment:
Unrelated to this PR:
This is a weird style that seems to be used in some task actions. Not sure
why we even need `@JsonIgnore` here when we are already exposing a getter
annotated with `@JsonProperty`. Other task actions, like
`SegmentAllocateAction` don't do this.
##########
docs/operations/clean-metadata-store.md:
##########
@@ -86,6 +86,8 @@ Only applies to the specified datasources in the dynamic
configuration parameter
If `killDataSourceWhitelist` is not set or empty, then kill tasks can be
submitted for all datasources.
- `druid.coordinator.kill.period`: Defines the frequency in [ISO 8601
format](https://en.wikipedia.org/wiki/ISO_8601#Durations) for the cleanup job
to check for and delete eligible segments. Defaults to `P1D`. Must be greater
than `druid.coordinator.period.indexingPeriod`.
- `druid.coordinator.kill.durationToRetain`: Defines the retention period in
[ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) after
creation that segments become eligible for deletion.
+- `druid.coordinator.kill.ignoreDurationToRetain`: A way to override
`druid.coordinator.kill.durationToRetain`. When enabled, the coordinator will
consider all unused segments as eligible to be killed.
+- `druid.coordinator.kill.bufferPeriod`: Defines the amount of time that a
segment must be unused before it is able to be permanently removed from
metadata and deep storage. This serves as a buffer period to prevent data loss
if data ends up being needed after being marked unused.
Review Comment:
Thanks for adding these in the docs!
```suggestion
- `druid.coordinator.kill.bufferPeriod`: Defines the amount of time that a
segment must be unused before it can be permanently removed from metadata and
deep storage. This serves as a buffer period to prevent data loss if data ends
up being needed after being marked unused.
```
##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -127,11 +128,23 @@ Collection<DataSegment> retrieveUsedSegmentsForIntervals(
);
/**
- * see {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer)}
+ * See {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer,
DateTime)}.
*/
default List<DataSegment> retrieveUnusedSegmentsForInterval(String
dataSource, Interval interval)
{
- return retrieveUnusedSegmentsForInterval(dataSource, interval, null);
+ return retrieveUnusedSegmentsForInterval(dataSource, interval, null, null);
+ }
+
+ /**
+ * See {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer,
DateTime)}.
+ */
+ default List<DataSegment> retrieveUnusedSegmentsForInterval(
Review Comment:
We should maybe get rid of some of these methods as they seem to be used
only in tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]