This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch 29.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/29.0.0 by this push:
new 4b60c7eea74 Kill tasks honor the buffer period of unused segments
(#15710) (#15811)
4b60c7eea74 is described below
commit 4b60c7eea74851d46d8fd17f8e87ed0b16c6448b
Author: Laksh Singla <[email protected]>
AuthorDate: Wed Jan 31 14:31:20 2024 +0530
Kill tasks honor the buffer period of unused segments (#15710) (#15811)
* Kill tasks should honor the buffer period of unused segments.
- The coordinator duty KillUnusedSegments determines an umbrella interval
for each datasource to determine the kill interval. There can be multiple
unused
segments in an umbrella interval with different used_status_last_updated
timestamps.
For example, consider an unused segment that is 30 days old and one that is
1 hour old. Currently
the kill task after the 30-day mark would kill both the unused segments and
not retain the 1-hour
old one.
- However, when a kill task is instantiated with this umbrella interval,
it’d kill
all the unused segments regardless of the last updated timestamp. We need
kill
tasks and RetrieveUnusedSegmentsAction to honor the bufferPeriod to avoid
killing
unused segments in the kill interval prematurely.
* Clarify default behavior in docs.
* test comments
* fix canDutyRun()
* small updates.
* checkstyle
* forbidden api fix
* doc fix, unused import, codeql scan error, and cleanup logs.
* Address review comments
* Rename maxUsedFlagLastUpdatedTime to maxUsedStatusLastUpdatedTime
This is consistent with the column name `used_status_last_updated`.
* Apply suggestions from code review
* Make period Duration type
* Remove older variants of runKilLTask() in OverlordClient interface
* Test can now run without waiting for canDutyRun().
* Remove previous variants of retrieveUnusedSegments from internal metadata
storage coordinator interface.
Removes the following interface methods in favor of a new method added:
- retrieveUnusedSegmentsForInterval(String, Interval)
- retrieveUnusedSegmentsForInterval(String, Interval, Integer)
* Chain stream operations
* cleanup
* Pass in the lastUpdatedTime to markUnused test function and remove sleep.
---------
Co-authored-by: Abhishek Radhakrishnan <[email protected]>
Co-authored-by: Kashif Faraz <[email protected]>
---
docs/data-management/delete.md | 10 +-
docs/operations/clean-metadata-store.md | 11 +-
.../actions/RetrieveUnusedSegmentsAction.java | 18 +-
.../druid/indexing/common/task/ArchiveTask.java | 2 +-
.../common/task/KillUnusedSegmentsTask.java | 39 +-
.../druid/indexing/common/task/MoveTask.java | 2 +-
.../druid/indexing/common/task/RestoreTask.java | 2 +-
.../actions/RetrieveSegmentsActionsTest.java | 20 +-
...ClientKillUnusedSegmentsTaskQuerySerdeTest.java | 37 +-
.../common/task/KillUnusedSegmentsTaskTest.java | 495 +++++++++++++++++++--
.../druid/indexing/overlord/TaskLifecycleTest.java | 20 +-
.../overlord/http/OverlordResourceTest.java | 2 +-
.../TestIndexerMetadataStorageCoordinator.java | 28 +-
.../ClientKillUnusedSegmentsTaskQuery.java | 35 +-
.../IndexerMetadataStorageCoordinator.java | 16 +-
.../IndexerSQLMetadataStorageCoordinator.java | 22 +-
.../druid/metadata/SegmentsMetadataManager.java | 18 +-
.../druid/metadata/SqlSegmentsMetadataManager.java | 9 +-
.../druid/metadata/SqlSegmentsMetadataQuery.java | 56 ++-
.../apache/druid/rpc/indexing/OverlordClient.java | 34 +-
.../coordinator/duty/KillUnusedSegments.java | 117 +++--
.../druid/server/http/DataSourcesResource.java | 2 +-
.../ClientKillUnusedSegmentsTaskQueryTest.java | 3 +-
.../IndexerSQLMetadataStorageCoordinatorTest.java | 213 +++++++--
.../druid/rpc/indexing/OverlordClientImplTest.java | 1 +
.../coordinator/duty/KillUnusedSegmentsTest.java | 56 +--
.../simulate/TestSegmentsMetadataManager.java | 2 +-
.../druid/server/http/DataSourcesResourceTest.java | 2 +-
28 files changed, 980 insertions(+), 292 deletions(-)
diff --git a/docs/data-management/delete.md b/docs/data-management/delete.md
index 6acd2fc782b..fccb14007b9 100644
--- a/docs/data-management/delete.md
+++ b/docs/data-management/delete.md
@@ -95,9 +95,10 @@ The available grammar is:
"id": <task_id>,
"dataSource": <task_datasource>,
"interval" : <all_unused_segments_in_this_interval_will_die!>,
- "context": <task context>,
- "batchSize": <optional_batch size>,
- "limit": <the maximum number of segments to delete>
+ "context": <task_context>,
+ "batchSize": <optional_batch_size>,
+ "limit": <optional_maximum_number_of_segments_to_delete>,
+ "maxUsedStatusLastUpdatedTime":
<optional_maximum_timestamp_when_segments_were_marked_as_unused>
}
```
@@ -106,7 +107,8 @@ Some of the parameters used in the task payload are further
explained below:
| Parameter | Default | Explanation
|
|-------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `batchSize` |100 | Maximum number of segments that are deleted in one
kill batch. Some operations on the Overlord may get stuck while a `kill` task
is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus,
a `kill` task splits the list of unused segments to be deleted into smaller
batches to yield the Overlord resources intermittently to other task
operations.|
-| `limit` | null - no limit | Maximum number of segments for the kill task
to delete.|
+| `limit` | null (no limit) | Maximum number of segments for the kill task
to delete.|
+| `maxUsedStatusLastUpdatedTime` | null (no cutoff) | Maximum timestamp used
as a cutoff to include unused segments. The kill task only considers segments
which lie in the specified `interval` and were marked as unused no later than
this time. The default behavior is to kill all unused segments in the
`interval` regardless of when they where marked as unused.|
**WARNING:** The `kill` task permanently removes all information about the
affected segments from the metadata store and
diff --git a/docs/operations/clean-metadata-store.md
b/docs/operations/clean-metadata-store.md
index 202b27805ed..b30374123ee 100644
--- a/docs/operations/clean-metadata-store.md
+++ b/docs/operations/clean-metadata-store.md
@@ -86,6 +86,8 @@ Only applies to the specified datasources in the dynamic
configuration parameter
If `killDataSourceWhitelist` is not set or empty, then kill tasks can be
submitted for all datasources.
- `druid.coordinator.kill.period`: Defines the frequency in [ISO 8601
format](https://en.wikipedia.org/wiki/ISO_8601#Durations) for the cleanup job
to check for and delete eligible segments. Defaults to `P1D`. Must be greater
than `druid.coordinator.period.indexingPeriod`.
- `druid.coordinator.kill.durationToRetain`: Defines the retention period in
[ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) after
creation that segments become eligible for deletion.
+- `druid.coordinator.kill.ignoreDurationToRetain`: A way to override
`druid.coordinator.kill.durationToRetain`. When enabled, the coordinator
considers all unused segments as eligible to be killed.
+- `druid.coordinator.kill.bufferPeriod`: Defines the amount of time that a
segment must be unused before it can be permanently removed from metadata and
deep storage. This serves as a buffer period to prevent data loss if data ends
up being needed after being marked unused.
- `druid.coordinator.kill.maxSegments`: Defines the maximum number of segments
to delete per kill task.
### Audit records
@@ -189,15 +191,15 @@ druid.coordinator.kill.datasource.on=false
<a name="example"></a>
## Example configuration for automated metadata cleanup
-Consider a scenario where you have scripts to create and delete hundreds of
datasources and related entities a day. You do not want to fill your metadata
store with leftover records. The datasources and related entities tend to
persist for only one or two days. Therefore, you want to run a cleanup job that
identifies and removes leftover records that are at least four days old. The
exception is for audit logs, which you need to retain for 30 days:
+Consider a scenario where you have scripts to create and delete hundreds of
datasources and related entities a day. You do not want to fill your metadata
store with leftover records. The datasources and related entities tend to
persist for only one or two days. Therefore, you want to run a cleanup job that
identifies and removes leftover records that are at least four days old after a
seven day buffer period in case you want to recover the data. The exception is
for audit logs, which you [...]
```properties
...
# Schedule the metadata management store task for every hour:
-druid.coordinator.period.metadataStoreManagementPeriod=P1H
+druid.coordinator.period.metadataStoreManagementPeriod=PT1H
-# Set a kill task to poll every day to delete Segment records and segments
-# in deep storage > 4 days old. When druid.coordinator.kill.on is set to true,
+# Set a kill task to poll every day to delete segment records and segments
+# in deep storage > 4 days old after a 7-day buffer period. When
druid.coordinator.kill.on is set to true,
# you can set killDataSourceWhitelist in the dynamic configuration to limit
# the datasources that can be killed.
# Required also for automated cleanup of rules and compaction configuration.
@@ -205,6 +207,7 @@ druid.coordinator.period.metadataStoreManagementPeriod=P1H
druid.coordinator.kill.on=true
druid.coordinator.kill.period=P1D
druid.coordinator.kill.durationToRetain=P4D
+druid.coordinator.kill.bufferPeriod=P7D
druid.coordinator.kill.maxSegments=1000
# Poll every day to delete audit records > 30 days old
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
index 150648858c1..bb188952966 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -42,16 +43,21 @@ public class RetrieveUnusedSegmentsAction implements
TaskAction<List<DataSegment
@JsonIgnore
private final Integer limit;
+ @JsonIgnore
+ private final DateTime maxUsedStatusLastUpdatedTime;
+
@JsonCreator
public RetrieveUnusedSegmentsAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
- @JsonProperty("limit") @Nullable Integer limit
+ @JsonProperty("limit") @Nullable Integer limit,
+ @JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime
maxUsedStatusLastUpdatedTime
)
{
this.dataSource = dataSource;
this.interval = interval;
this.limit = limit;
+ this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
}
@JsonProperty
@@ -73,6 +79,13 @@ public class RetrieveUnusedSegmentsAction implements
TaskAction<List<DataSegment
return limit;
}
+ @Nullable
+ @JsonProperty
+ public DateTime getMaxUsedStatusLastUpdatedTime()
+ {
+ return maxUsedStatusLastUpdatedTime;
+ }
+
@Override
public TypeReference<List<DataSegment>> getReturnTypeReference()
{
@@ -83,7 +96,7 @@ public class RetrieveUnusedSegmentsAction implements
TaskAction<List<DataSegment
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
return toolbox.getIndexerMetadataStorageCoordinator()
- .retrieveUnusedSegmentsForInterval(dataSource, interval, limit);
+ .retrieveUnusedSegmentsForInterval(dataSource, interval, limit,
maxUsedStatusLastUpdatedTime);
}
@Override
@@ -99,6 +112,7 @@ public class RetrieveUnusedSegmentsAction implements
TaskAction<List<DataSegment
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
", limit=" + limit +
+ ", maxUsedStatusLastUpdatedTime=" + maxUsedStatusLastUpdatedTime +
'}';
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
index 42d04316a41..957317ad93c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
@@ -79,7 +79,7 @@ public class ArchiveTask extends AbstractFixedIntervalTask
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
- .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(),
myLock.getInterval(), null));
+ .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(),
myLock.getInterval(), null, null));
// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
index 54fae94684f..cbf4a84ba79 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
@@ -63,9 +63,10 @@ import java.util.stream.Collectors;
/**
* The client representation of this task is {@link
ClientKillUnusedSegmentsTaskQuery}.
* JSON serialization fields of this class must correspond to those of {@link
- * ClientKillUnusedSegmentsTaskQuery}, except for "id" and "context" fields.
+ * ClientKillUnusedSegmentsTaskQuery}, except for {@link #id} and {@link
#context} fields.
* <p>
* The field {@link #isMarkAsUnused()} is now deprecated.
+ * </p>
*/
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
@@ -95,6 +96,12 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
*/
@Nullable private final Integer limit;
+ /**
+ * The maximum used status last updated time. Any segments with
+ * {@code used_status_last_updated} no later than this time will be included
in the kill task.
+ */
+ @Nullable private final DateTime maxUsedStatusLastUpdatedTime;
+
@JsonCreator
public KillUnusedSegmentsTask(
@JsonProperty("id") String id,
@@ -103,7 +110,8 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
@JsonProperty("batchSize") Integer batchSize,
- @JsonProperty("limit") @Nullable Integer limit
+ @JsonProperty("limit") @Nullable Integer limit,
+ @JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime
maxUsedStatusLastUpdatedTime
)
{
super(
@@ -115,15 +123,16 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
this.markAsUnused = markAsUnused != null && markAsUnused;
this.batchSize = (batchSize != null) ? batchSize :
DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
if (this.batchSize <= 0) {
- throw InvalidInput.exception("batchSize[%d] must be a positive
integer.", limit);
+ throw InvalidInput.exception("batchSize[%d] must be a positive
integer.", batchSize);
}
if (limit != null && limit <= 0) {
- throw InvalidInput.exception("Limit[%d] must be a positive integer.",
limit);
+ throw InvalidInput.exception("limit[%d] must be a positive integer.",
limit);
}
if (limit != null && Boolean.TRUE.equals(markAsUnused)) {
- throw InvalidInput.exception("Limit cannot be provided when markAsUnused
is enabled.");
+ throw InvalidInput.exception("limit[%d] cannot be provided when
markAsUnused is enabled.", limit);
}
this.limit = limit;
+ this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
}
/**
@@ -155,6 +164,13 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
return limit;
}
+ @Nullable
+ @JsonProperty
+ public DateTime getMaxUsedStatusLastUpdatedTime()
+ {
+ return maxUsedStatusLastUpdatedTime;
+ }
+
@Override
public String getType()
{
@@ -180,7 +196,8 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
numSegmentsMarkedAsUnused = toolbox.getTaskActionClient().submit(
new MarkSegmentsAsUnusedAction(getDataSource(), getInterval())
);
- LOG.info("Marked [%d] segments as unused.", numSegmentsMarkedAsUnused);
+ LOG.info("Marked [%d] segments of datasource[%s] in interval[%s] as
unused.",
+ numSegmentsMarkedAsUnused, getDataSource(), getInterval());
} else {
numSegmentsMarkedAsUnused = 0;
}
@@ -190,9 +207,13 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
@Nullable Integer numTotalBatches = getNumTotalBatches();
List<DataSegment> unusedSegments;
LOG.info(
- "Starting kill with batchSize[%d], up to limit[%d] segments will be
deleted%s",
+ "Starting kill for datasource[%s] in interval[%s] with batchSize[%d],
up to limit[%d] segments "
+ + "before maxUsedStatusLastUpdatedTime[%s] will be deleted%s",
+ getDataSource(),
+ getInterval(),
batchSize,
limit,
+ maxUsedStatusLastUpdatedTime,
numTotalBatches != null ? StringUtils.format(" in [%d] batches.",
numTotalBatches) : "."
);
@@ -217,7 +238,9 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
unusedSegments = toolbox
.getTaskActionClient()
- .submit(new RetrieveUnusedSegmentsAction(getDataSource(),
getInterval(), nextBatchSize));
+ .submit(
+ new RetrieveUnusedSegmentsAction(getDataSource(),
getInterval(), nextBatchSize, maxUsedStatusLastUpdatedTime
+ ));
// Fetch locks each time as a revokal could have occurred in between
batches
final NavigableMap<DateTime, List<TaskLock>> taskLockMap
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
index d23b3820db7..bea3685ca24 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
@@ -87,7 +87,7 @@ public class MoveTask extends AbstractFixedIntervalTask
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
- .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(),
myLock.getInterval(), null));
+ .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(),
myLock.getInterval(), null, null));
// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
index 1364bcb597f..cc635383fed 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
@@ -80,7 +80,7 @@ public class RestoreTask extends AbstractFixedIntervalTask
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
- .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(),
myLock.getInterval(), null));
+ .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(),
myLock.getInterval(), null, null));
// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
index 09f724bcfc1..2dee594aad6 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
@@ -20,8 +20,10 @@
package org.apache.druid.indexing.common.actions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
@@ -103,7 +105,23 @@ public class RetrieveSegmentsActionsTest
@Test
public void testRetrieveUnusedSegmentsAction()
{
- final RetrieveUnusedSegmentsAction action = new
RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null);
+ final RetrieveUnusedSegmentsAction action = new
RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, null);
+ final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task,
actionTestKit.getTaskActionToolbox()));
+ Assert.assertEquals(expectedUnusedSegments, resultSegments);
+ }
+
+ @Test
+ public void testRetrieveUnusedSegmentsActionWithMinUsedLastUpdatedTime()
+ {
+ final RetrieveUnusedSegmentsAction action = new
RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null,
DateTimes.MIN);
+ final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task,
actionTestKit.getTaskActionToolbox()));
+ Assert.assertEquals(ImmutableSet.of(), resultSegments);
+ }
+
+ @Test
+ public void testRetrieveUnusedSegmentsActionWithNowUsedLastUpdatedTime()
+ {
+ final RetrieveUnusedSegmentsAction action = new
RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null,
DateTimes.nowUtc());
final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task,
actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUnusedSegments, resultSegments);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
index 3ab6bae4688..7e7c9088d61 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.junit.Assert;
import org.junit.Before;
@@ -53,7 +54,8 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Intervals.of("2020-01-01/P1D"),
false,
99,
- 5
+ 5,
+ DateTimes.nowUtc()
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask)
objectMapper.readValue(json, Task.class);
@@ -63,7 +65,7 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Assert.assertEquals(taskQuery.getMarkAsUnused(),
fromJson.isMarkAsUnused());
Assert.assertEquals(taskQuery.getBatchSize(),
Integer.valueOf(fromJson.getBatchSize()));
Assert.assertEquals(taskQuery.getLimit(), fromJson.getLimit());
-
+ Assert.assertEquals(taskQuery.getMaxUsedStatusLastUpdatedTime(),
fromJson.getMaxUsedStatusLastUpdatedTime());
}
@Test
@@ -75,6 +77,7 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Intervals.of("2020-01-01/P1D"),
true,
null,
+ null,
null
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
@@ -85,6 +88,7 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Assert.assertEquals(taskQuery.getMarkAsUnused(),
fromJson.isMarkAsUnused());
Assert.assertEquals(100, fromJson.getBatchSize());
Assert.assertNull(taskQuery.getLimit());
+ Assert.assertNull(taskQuery.getMaxUsedStatusLastUpdatedTime());
}
@Test
@@ -97,6 +101,7 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
null,
true,
99,
+ null,
null
);
final byte[] json = objectMapper.writeValueAsBytes(task);
@@ -110,5 +115,33 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
Assert.assertEquals(Integer.valueOf(task.getBatchSize()),
taskQuery.getBatchSize());
Assert.assertNull(task.getLimit());
+ Assert.assertNull(task.getMaxUsedStatusLastUpdatedTime());
+ }
+
+ @Test
+ public void
testKillUnusedSegmentsTaskWithNonNullValuesToClientKillUnusedSegmentsTaskQuery()
throws IOException
+ {
+ final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask(
+ null,
+ "datasource",
+ Intervals.of("2020-01-01/P1D"),
+ null,
+ null,
+ 99,
+ 100,
+ DateTimes.nowUtc()
+ );
+ final byte[] json = objectMapper.writeValueAsBytes(task);
+ final ClientKillUnusedSegmentsTaskQuery taskQuery =
(ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue(
+ json,
+ ClientTaskQuery.class
+ );
+ Assert.assertEquals(task.getId(), taskQuery.getId());
+ Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
+ Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
+ Assert.assertNull(taskQuery.getMarkAsUnused());
+ Assert.assertEquals(Integer.valueOf(task.getBatchSize()),
taskQuery.getBatchSize());
+ Assert.assertEquals(task.getLimit(), taskQuery.getLimit());
+ Assert.assertEquals(task.getMaxUsedStatusLastUpdatedTime(),
taskQuery.getMaxUsedStatusLastUpdatedTime());
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
index d77f2e6c243..e2c433536a2 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
@@ -22,6 +22,8 @@ package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexing.common.TaskReport;
@@ -29,8 +31,11 @@ import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.timeline.DataSegment;
import org.assertj.core.api.Assertions;
+import org.hamcrest.MatcherAssert;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
@@ -40,6 +45,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
public class KillUnusedSegmentsTaskTest extends IngestionTestBase
{
@@ -86,18 +92,24 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
null,
false,
null,
+ null,
null
);
Assert.assertEquals(TaskState.SUCCESS,
taskRunner.run(task).get().getStatusCode());
- final List<DataSegment> unusedSegments =
-
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE,
Intervals.of("2019/2020"));
+ final List<DataSegment> unusedSegments =
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ Intervals.of("2019/2020"),
+ null,
+ null
+ );
Assert.assertEquals(ImmutableList.of(newSegment(Intervals.of("2019-02-01/2019-03-01"),
version)), unusedSegments);
- Assertions.assertThat(
- getMetadataStorageCoordinator()
- .retrieveUsedSegmentsForInterval(DATA_SOURCE,
Intervals.of("2019/2020"), Segments.ONLY_VISIBLE)
+
Assertions.assertThat(getMetadataStorageCoordinator().retrieveUsedSegmentsForInterval(
+ DATA_SOURCE,
+ Intervals.of("2019/2020"),
+ Segments.ONLY_VISIBLE)
).containsExactlyInAnyOrder(
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
@@ -135,13 +147,19 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
null,
true,
null,
+ null,
null
);
Assert.assertEquals(TaskState.SUCCESS,
taskRunner.run(task).get().getStatusCode());
final List<DataSegment> unusedSegments =
-
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE,
Intervals.of("2019/2020"));
+ getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ Intervals.of("2019/2020"),
+ null,
+ null
+ );
Assert.assertEquals(ImmutableList.of(newSegment(Intervals.of("2019-02-01/2019-03-01"),
version)), unusedSegments);
Assertions.assertThat(
@@ -166,6 +184,7 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
null,
true,
null,
+ null,
null
);
Assert.assertTrue(task.getInputSourceResources().isEmpty());
@@ -176,10 +195,10 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
{
final String version = DateTimes.nowUtc().toString();
final Set<DataSegment> segments = ImmutableSet.of(
- newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
- newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
- newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
- newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
+ newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
+ newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
+ newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
+ newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
final Set<DataSegment> announced =
getMetadataStorageCoordinator().commitSegments(segments);
@@ -194,58 +213,374 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
);
final KillUnusedSegmentsTask task =
- new KillUnusedSegmentsTask(
- null,
- DATA_SOURCE,
- Intervals.of("2018-01-01/2020-01-01"),
- null,
- false,
- 1,
- 4
- );
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01"),
+ null,
+ false,
+ 1,
+ 4,
+ null
+ );
Assert.assertEquals(TaskState.SUCCESS,
taskRunner.run(task).get().getStatusCode());
// we expect ALL tasks to be deleted
final List<DataSegment> unusedSegments =
-
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE,
Intervals.of("2019/2020"));
+ getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ Intervals.of("2019/2020"),
+ null,
+ null
+ );
Assert.assertEquals(Collections.emptyList(), unusedSegments);
Assert.assertEquals(new KillTaskReport.Stats(4, 4, 0), getReportedStats());
}
+ /**
+ * Test kill functionality of multiple unused segments in a wide interval
with different {@code used_status_last_updated}
+ * timestamps. A kill task submitted with null {@code
maxUsedStatusLastUpdatedTime} will kill all the unused segments in the kill
+ * interval.
+ */
+ @Test
+ public void
testKillMultipleUnusedSegmentsWithNullMaxUsedStatusLastUpdatedTime() throws
Exception
+ {
+ final String version = DateTimes.nowUtc().toString();
+ final DataSegment segment1 =
newSegment(Intervals.of("2019-01-01/2019-02-01"), version);
+ final DataSegment segment2 =
newSegment(Intervals.of("2019-02-01/2019-03-01"), version);
+ final DataSegment segment3 =
newSegment(Intervals.of("2019-03-01/2019-04-01"), version);
+ final DataSegment segment4 =
newSegment(Intervals.of("2019-04-01/2019-05-01"), version);
+
+ final Set<DataSegment> segments = ImmutableSet.of(segment1, segment2,
segment3, segment4);
+ final Set<DataSegment> announced =
getMetadataStorageCoordinator().commitSegments(segments);
+
+ Assert.assertEquals(segments, announced);
+
+ Assert.assertEquals(
+ 1,
+ getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+ DATA_SOURCE,
+ segment1.getInterval()
+ )
+ );
+
+ Assert.assertEquals(
+ 1,
+ getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+ DATA_SOURCE,
+ segment4.getInterval()
+ )
+ );
+
+ Assert.assertEquals(
+ 1,
+ getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+ DATA_SOURCE,
+ segment3.getInterval()
+ )
+ );
+
+ final List<Interval> segmentIntervals = segments.stream()
+
.map(DataSegment::getInterval)
+
.collect(Collectors.toList());
+
+ final Interval umbrellaInterval =
JodaUtils.umbrellaInterval(segmentIntervals);
+
+
+ final KillUnusedSegmentsTask task =
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ false,
+ 1,
+ 10,
+ null
+ );
+
+ Assert.assertEquals(TaskState.SUCCESS,
taskRunner.run(task).get().getStatusCode());
+
+ final List<DataSegment> unusedSegments =
+ getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ null
+ );
+
+ Assert.assertEquals(ImmutableList.of(), unusedSegments);
+ Assert.assertEquals(new KillTaskReport.Stats(3, 4, 0), getReportedStats());
+ }
+
+ /**
+ * Test kill functionality of multiple unused segments in a wide interval
with different {@code used_status_last_updated}
+ * timestamps. Consider:
+ * <li> {@code segment1}, {@code segment2} and {@code segment3} have t1, t2
and t3 {@code used_status_last_updated} timestamps
+ * respectively, where t1 < t2 < t3 </li>
+ * <li> {@code segment4} is a used segment and therefore shouldn't be killed
</li>
+ *
+ * <p>
+ * A kill task submitted with t2 as the {@code maxUsedStatusLastUpdatedTime}
should only kill {@code segment1} and {@code segment2}
+ * After that, a kill task submitted with t3 as the {@code
maxUsedStatusLastUpdatedTime} should kill {@code segment3}.
+ * </p>
+ */
+ @Test
+ public void
testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedTime()
throws Exception
+ {
+ final String version = DateTimes.nowUtc().toString();
+ final DataSegment segment1 =
newSegment(Intervals.of("2019-01-01/2019-02-01"), version);
+ final DataSegment segment2 =
newSegment(Intervals.of("2019-02-01/2019-03-01"), version);
+ final DataSegment segment3 =
newSegment(Intervals.of("2019-03-01/2019-04-01"), version);
+ final DataSegment segment4 =
newSegment(Intervals.of("2019-04-01/2019-05-01"), version);
+
+ final Set<DataSegment> segments = ImmutableSet.of(segment1, segment2,
segment3, segment4);
+ final Set<DataSegment> announced =
getMetadataStorageCoordinator().commitSegments(segments);
+
+ Assert.assertEquals(segments, announced);
+
+ Assert.assertEquals(
+ 1,
+ getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+ DATA_SOURCE,
+ segment1.getInterval()
+ )
+ );
+
+ Assert.assertEquals(
+ 1,
+ getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+ DATA_SOURCE,
+ segment4.getInterval()
+ )
+ );
+
+ // Capture the last updated time cutoff
+ final DateTime maxUsedStatusLastUpdatedTime1 = DateTimes.nowUtc();
+
+ // Delay for 1s, mark the segments as unused and then capture the last
updated time cutoff again
+ Thread.sleep(1000);
+
+ // now mark the third segment as unused
+ Assert.assertEquals(
+ 1,
+ getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+ DATA_SOURCE,
+ segment3.getInterval()
+ )
+ );
+
+ final DateTime maxUsedStatusLastUpdatedTime2 = DateTimes.nowUtc();
+
+
+ final List<Interval> segmentIntervals = segments.stream()
+
.map(DataSegment::getInterval)
+
.collect(Collectors.toList());
+
+ final Interval umbrellaInterval =
JodaUtils.umbrellaInterval(segmentIntervals);
+
+ final KillUnusedSegmentsTask task1 =
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ false,
+ 1,
+ 10,
+ maxUsedStatusLastUpdatedTime1
+ );
+
+ Assert.assertEquals(TaskState.SUCCESS,
taskRunner.run(task1).get().getStatusCode());
+
+ final List<DataSegment> unusedSegments =
+ getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ null
+ );
+
+ Assert.assertEquals(ImmutableList.of(segment3), unusedSegments);
+ Assert.assertEquals(new KillTaskReport.Stats(2, 3, 0), getReportedStats());
+
+ final KillUnusedSegmentsTask task2 =
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ false,
+ 1,
+ 10,
+ maxUsedStatusLastUpdatedTime2
+ );
+
+ Assert.assertEquals(TaskState.SUCCESS,
taskRunner.run(task2).get().getStatusCode());
+
+ final List<DataSegment> unusedSegments2 =
+ getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ null
+ );
+
+ Assert.assertEquals(ImmutableList.of(), unusedSegments2);
+ Assert.assertEquals(new KillTaskReport.Stats(1, 2, 0), getReportedStats());
+ }
+
+ /**
+ * Similar to {@link
#testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedTime()}},
but with a different setup.
+ * <p>
+ * Tests kill functionality of multiple unused segments in a wide interval
with different {@code used_status_last_updated}
+ * timestamps. Consider:
+ * <li> {@code segment1} and {@code segment4} have t1 {@code
used_status_last_updated} timestamp
+ * <li> {@code segment2} and {@code segment3} have t2 {@code
used_status_last_updated} timestamp, where t1 < t2 </li>
+ *
+ * <p>
+ * A kill task submitted with t1 as the {@code maxUsedStatusLastUpdatedTime}
should only kill {@code segment1} and {@code segment4}
+ * After that, a kill task submitted with t2 as the {@code
maxUsedStatusLastUpdatedTime} should kill {@code segment2} and {@code segment3}.
+ * </p>
+ */
+ @Test
+ public void
testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedTime2()
throws Exception
+ {
+ final String version = DateTimes.nowUtc().toString();
+ final DataSegment segment1 =
newSegment(Intervals.of("2019-01-01/2019-02-01"), version);
+ final DataSegment segment2 =
newSegment(Intervals.of("2019-02-01/2019-03-01"), version);
+ final DataSegment segment3 =
newSegment(Intervals.of("2019-03-01/2019-04-01"), version);
+ final DataSegment segment4 =
newSegment(Intervals.of("2019-04-01/2019-05-01"), version);
+
+ final Set<DataSegment> segments = ImmutableSet.of(segment1, segment2,
segment3, segment4);
+ final Set<DataSegment> announced =
getMetadataStorageCoordinator().commitSegments(segments);
+
+ Assert.assertEquals(segments, announced);
+
+ Assert.assertEquals(
+ 2,
+ getSegmentsMetadataManager().markSegmentsAsUnused(
+ ImmutableSet.of(
+ segment1.getId(),
+ segment4.getId()
+ )
+ )
+ );
+
+ final DateTime maxUsedStatusLastUpdatedTime1 = DateTimes.nowUtc();
+
+ // Delay for 1s, mark the segments as unused and then capture the last
updated time cutoff again
+ Thread.sleep(1000);
+
+ Assert.assertEquals(
+ 2,
+ getSegmentsMetadataManager().markSegmentsAsUnused(
+ ImmutableSet.of(
+ segment2.getId(),
+ segment3.getId()
+ )
+ )
+ );
+
+ final DateTime maxUsedStatusLastUpdatedTime2 = DateTimes.nowUtc();
+
+
+ final List<Interval> segmentIntervals = segments.stream()
+
.map(DataSegment::getInterval)
+
.collect(Collectors.toList());
+
+ final Interval umbrellaInterval =
JodaUtils.umbrellaInterval(segmentIntervals);
+
+
+ final KillUnusedSegmentsTask task1 =
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ false,
+ 1,
+ 10,
+ maxUsedStatusLastUpdatedTime1
+ );
+
+ Assert.assertEquals(TaskState.SUCCESS,
taskRunner.run(task1).get().getStatusCode());
+
+ final List<DataSegment> unusedSegments =
+ getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ null
+ );
+
+ Assert.assertEquals(ImmutableList.of(segment2, segment3), unusedSegments);
+ Assert.assertEquals(new KillTaskReport.Stats(2, 3, 0), getReportedStats());
+
+ final KillUnusedSegmentsTask task2 =
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ false,
+ 1,
+ 10,
+ maxUsedStatusLastUpdatedTime2
+ );
+
+ Assert.assertEquals(TaskState.SUCCESS,
taskRunner.run(task2).get().getStatusCode());
+
+ final List<DataSegment> unusedSegments2 =
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ umbrellaInterval,
+ null,
+ null
+ );
+
+ Assert.assertEquals(ImmutableList.of(), unusedSegments2);
+ Assert.assertEquals(new KillTaskReport.Stats(2, 3, 0), getReportedStats());
+ }
+
@Test
public void testKillBatchSizeThree() throws Exception
{
final String version = DateTimes.nowUtc().toString();
final Set<DataSegment> segments = ImmutableSet.of(
- newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
- newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
- newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
- newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
+ newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
+ newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
+ newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
+ newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
final Set<DataSegment> announced =
getMetadataStorageCoordinator().commitSegments(segments);
Assert.assertEquals(segments, announced);
final KillUnusedSegmentsTask task =
- new KillUnusedSegmentsTask(
- null,
- DATA_SOURCE,
- Intervals.of("2018-01-01/2020-01-01"),
- null,
- true,
- 3,
- null
- );
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01"),
+ null,
+ true,
+ 3,
+ null,
+ null
+ );
Assert.assertEquals(TaskState.SUCCESS,
taskRunner.run(task).get().getStatusCode());
// we expect ALL tasks to be deleted
- final List<DataSegment> unusedSegments =
-
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE,
Intervals.of("2019/2020"));
+ final List<DataSegment> unusedSegments =
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
+ DATA_SOURCE,
+ Intervals.of("2019/2020"),
+ null,
+ null
+ );
Assert.assertEquals(Collections.emptyList(), unusedSegments);
@@ -263,6 +598,7 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
null,
false,
null,
+ null,
null
);
Assert.assertEquals(100, task.computeNextBatchSize(50));
@@ -279,7 +615,8 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
null,
false,
10,
- 5
+ 5,
+ null
);
Assert.assertEquals(5, task.computeNextBatchSize(0));
}
@@ -295,7 +632,8 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
null,
false,
5,
- 10
+ 10,
+ null
);
Assert.assertEquals(5, task.computeNextBatchSize(0));
}
@@ -311,7 +649,8 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
null,
false,
5,
- 10
+ 10,
+ null
);
Assert.assertEquals(3, task.computeNextBatchSize(7));
}
@@ -327,6 +666,7 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
null,
false,
null,
+ null,
null
);
Assert.assertNull(task.getNumTotalBatches());
@@ -343,11 +683,81 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
null,
false,
10,
- 5
+ 5,
+ null
);
Assert.assertEquals(1, (int) task.getNumTotalBatches());
}
+ @Test
+ public void testInvalidLimit()
+ {
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ DruidException.class,
+ () -> new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01"),
+ null,
+ false,
+ 10,
+ 0,
+ null
+ )
+ ),
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "limit[0] must be a positive integer."
+ )
+ );
+ }
+
+ @Test
+ public void testInvalidBatchSize()
+ {
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ DruidException.class,
+ () -> new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01"),
+ null,
+ false,
+ 0,
+ 10,
+ null
+ )
+ ),
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "batchSize[0] must be a positive integer."
+ )
+ );
+ }
+
+ @Test
+ public void testInvalidMarkAsUnusedWithLimit()
+ {
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ DruidException.class,
+ () -> new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01"),
+ null,
+ true,
+ 10,
+ 10,
+ null
+ )
+ ),
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "limit[10] cannot be provided when markAsUnused is enabled."
+ )
+ );
+ }
+
@Test
public void testGetNumTotalBatchesWithBatchSizeSmallerThanLimit()
{
@@ -359,7 +769,8 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
null,
false,
5,
- 10
+ 10,
+ null
);
Assert.assertEquals(2, (int) task.getNumTotalBatches());
}
@@ -387,7 +798,9 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
try {
Object payload = getObjectMapper().readValue(
taskRunner.getTaskReportsFile(),
- new TypeReference<Map<String, TaskReport>>() { }
+ new TypeReference<Map<String, TaskReport>>()
+ {
+ }
).get(KillTaskReport.REPORT_KEY).getPayload();
return getObjectMapper().convertValue(payload,
KillTaskReport.Stats.class);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 94d21c144dc..002c70262cb 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -903,7 +903,13 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
// manually create local segments files
List<File> segmentFiles = new ArrayList<>();
- for (DataSegment segment :
mdc.retrieveUnusedSegmentsForInterval("test_kill_task",
Intervals.of("2011-04-01/P4D"))) {
+ final List<DataSegment> unusedSegments =
mdc.retrieveUnusedSegmentsForInterval(
+ "test_kill_task",
+ Intervals.of("2011-04-01/P4D"),
+ null,
+ null
+ );
+ for (DataSegment segment : unusedSegments) {
File file = new File((String) segment.getLoadSpec().get("path"));
FileUtils.mkdirp(file.getParentFile());
Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY);
@@ -918,6 +924,7 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
null,
false,
null,
+ null,
null
);
@@ -993,7 +1000,13 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
// manually create local segments files
List<File> segmentFiles = new ArrayList<>();
- for (DataSegment segment :
mdc.retrieveUnusedSegmentsForInterval("test_kill_task",
Intervals.of("2011-04-01/P4D"))) {
+ final List<DataSegment> unusedSegments =
mdc.retrieveUnusedSegmentsForInterval(
+ "test_kill_task",
+ Intervals.of("2011-04-01/P4D"),
+ null,
+ null
+ );
+ for (DataSegment segment : unusedSegments) {
File file = new File((String) segment.getLoadSpec().get("path"));
FileUtils.mkdirp(file.getParentFile());
Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY);
@@ -1009,7 +1022,8 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
null,
false,
null,
- maxSegmentsToKill
+ maxSegmentsToKill,
+ null
);
final TaskStatus status = runTask(killUnusedSegmentsTask);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index cdaf5b9b359..6897e69c26b 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -926,7 +926,7 @@ public class OverlordResourceTest
auditManager
);
- Task task = new KillUnusedSegmentsTask("kill_all", "allow",
Intervals.ETERNITY, null, false, 10, null);
+ Task task = new KillUnusedSegmentsTask("kill_all", "allow",
Intervals.ETERNITY, null, false, 10, null, null);
overlordResource.taskPost(task, req);
Assert.assertTrue(auditEntryCapture.hasCaptured());
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index 2fc80adceac..f42a300de5f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -34,6 +34,7 @@ import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.PartialShardSpec;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -43,7 +44,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Stream;
public class TestIndexerMetadataStorageCoordinator implements
IndexerMetadataStorageCoordinator
{
@@ -104,31 +104,21 @@ public class TestIndexerMetadataStorageCoordinator
implements IndexerMetadataSto
return ImmutableList.of();
}
- @Override
- public List<DataSegment> retrieveUnusedSegmentsForInterval(String
dataSource, Interval interval)
- {
- synchronized (unusedSegments) {
- return ImmutableList.copyOf(unusedSegments);
- }
- }
-
@Override
public List<DataSegment> retrieveUnusedSegmentsForInterval(
String dataSource,
Interval interval,
- @Nullable Integer limit
+ @Nullable Integer limit,
+ @Nullable DateTime maxUsedStatusLastUpdatedTime
)
{
synchronized (unusedSegments) {
- Stream<DataSegment> resultStream = unusedSegments.stream();
-
- resultStream = resultStream.filter(ds -> !nuked.contains(ds));
-
- if (limit != null) {
- resultStream = resultStream.limit(limit);
- }
-
- return ImmutableList.copyOf(resultStream.iterator());
+ return ImmutableList.copyOf(
+ unusedSegments.stream()
+ .filter(ds -> !nuked.contains(ds))
+ .limit(limit != null ? limit : Long.MAX_VALUE)
+ .iterator()
+ );
}
}
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
index 279c6699ff9..e5656ff3975 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
@@ -21,16 +21,17 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
+import org.apache.druid.error.InvalidInput;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Objects;
/**
- * Client representation of
org.apache.druid.indexing.common.task.KillUnusedSegmentsTask. JSON
searialization
- * fields of this class must correspond to those of
org.apache.druid.indexing.common.task.KillUnusedSegmentsTask, except
- * for "id" and "context" fields.
+ * Client representation of {@link
org.apache.druid.indexing.common.task.KillUnusedSegmentsTask}. JSON
searialization
+ * fields of this class must correspond to those of {@link
org.apache.druid.indexing.common.task.KillUnusedSegmentsTask},
+ * except for {@code id} and {@code context} fields.
*/
public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
{
@@ -42,6 +43,7 @@ public class ClientKillUnusedSegmentsTaskQuery implements
ClientTaskQuery
private final Boolean markAsUnused;
private final Integer batchSize;
@Nullable private final Integer limit;
+ @Nullable private final DateTime maxUsedStatusLastUpdatedTime;
@JsonCreator
public ClientKillUnusedSegmentsTaskQuery(
@@ -50,16 +52,23 @@ public class ClientKillUnusedSegmentsTaskQuery implements
ClientTaskQuery
@JsonProperty("interval") Interval interval,
@JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
@JsonProperty("batchSize") Integer batchSize,
- @JsonProperty("limit") Integer limit
+ @JsonProperty("limit") @Nullable Integer limit,
+ @JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime
maxUsedStatusLastUpdatedTime
)
{
- this.id = Preconditions.checkNotNull(id, "id");
+ if (id == null) {
+ throw InvalidInput.exception("kill task id cannot be null");
+ }
+ if (limit != null && limit <= 0) {
+ throw InvalidInput.exception("limit[%d] must be a positive integer.",
limit);
+ }
+ this.id = id;
this.dataSource = dataSource;
this.interval = interval;
this.markAsUnused = markAsUnused;
this.batchSize = batchSize;
- Preconditions.checkArgument(limit == null || limit > 0, "limit must be >
0");
this.limit = limit;
+ this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
}
@JsonProperty
@@ -116,6 +125,13 @@ public class ClientKillUnusedSegmentsTaskQuery implements
ClientTaskQuery
return limit;
}
+ @JsonProperty
+ @Nullable
+ public DateTime getMaxUsedStatusLastUpdatedTime()
+ {
+ return maxUsedStatusLastUpdatedTime;
+ }
+
@Override
public boolean equals(Object o)
@@ -132,12 +148,13 @@ public class ClientKillUnusedSegmentsTaskQuery implements
ClientTaskQuery
&& Objects.equals(interval, that.interval)
&& Objects.equals(markAsUnused, that.markAsUnused)
&& Objects.equals(batchSize, that.batchSize)
- && Objects.equals(limit, that.limit);
+ && Objects.equals(limit, that.limit)
+ && Objects.equals(maxUsedStatusLastUpdatedTime,
that.maxUsedStatusLastUpdatedTime);
}
@Override
public int hashCode()
{
- return Objects.hash(id, dataSource, interval, markAsUnused, batchSize,
limit);
+ return Objects.hash(id, dataSource, interval, markAsUnused, batchSize,
limit, maxUsedStatusLastUpdatedTime);
}
}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index dd0f7d8c98a..31c97533900 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -24,6 +24,7 @@ import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.PartialShardSpec;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -126,14 +127,6 @@ public interface IndexerMetadataStorageCoordinator
Segments visibility
);
- /**
- * see {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer)}
- */
- default List<DataSegment> retrieveUnusedSegmentsForInterval(String
dataSource, Interval interval)
- {
- return retrieveUnusedSegmentsForInterval(dataSource, interval, null);
- }
-
/**
* Retrieve all published segments which include ONLY data within the given
interval and are marked as unused from the
* metadata store.
@@ -141,6 +134,10 @@ public interface IndexerMetadataStorageCoordinator
* @param dataSource The data source the segments belong to
* @param interval Filter the data segments to ones that include data in
this interval exclusively.
* @param limit The maximum number of unused segments to retreive. If null,
no limit is applied.
+ * @param maxUsedStatusLastUpdatedTime The maximum {@code
used_status_last_updated} time. Any unused segment in {@code interval}
+ * with {@code used_status_last_updated}
no later than this time will be included in the
+ * kill task. Segments without {@code
used_status_last_updated} time (due to an upgrade
+ * from legacy Druid) will have {@code
maxUsedStatusLastUpdatedTime} ignored
*
* @return DataSegments which include ONLY data within the requested
interval and are marked as unused. Segments NOT
* returned here may include data in the interval
@@ -148,7 +145,8 @@ public interface IndexerMetadataStorageCoordinator
List<DataSegment> retrieveUnusedSegmentsForInterval(
String dataSource,
Interval interval,
- @Nullable Integer limit
+ @Nullable Integer limit,
+ @Nullable DateTime maxUsedStatusLastUpdatedTime
);
/**
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 69dca46ea1c..0ef488aed40 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -229,30 +229,34 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
);
}
- @Override
- public List<DataSegment> retrieveUnusedSegmentsForInterval(final String
dataSource, final Interval interval)
- {
- return retrieveUnusedSegmentsForInterval(dataSource, interval, null);
- }
-
@Override
public List<DataSegment> retrieveUnusedSegmentsForInterval(
String dataSource,
Interval interval,
- @Nullable Integer limit
+ @Nullable Integer limit,
+ @Nullable DateTime maxUsedStatusLastUpdatedTime
)
{
final List<DataSegment> matchingSegments = connector.inReadOnlyTransaction(
(handle, status) -> {
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector,
dbTables, jsonMapper)
- .retrieveUnusedSegments(dataSource,
Collections.singletonList(interval), limit, null, null)) {
+ .retrieveUnusedSegments(
+ dataSource,
+
Collections.singletonList(interval),
+ limit,
+ null,
+ null,
+ maxUsedStatusLastUpdatedTime
+ )
+ ) {
return ImmutableList.copyOf(iterator);
}
}
);
- log.info("Found %,d unused segments for %s for interval %s.",
matchingSegments.size(), dataSource, interval);
+ log.info("Found [%,d] unused segments for datasource[%s] in interval[%s]
with maxUsedStatusLastUpdatedTime[%s].",
+ matchingSegments.size(), dataSource, interval,
maxUsedStatusLastUpdatedTime);
return matchingSegments;
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
index eb8a36bc3a7..94c2fae60fd 100644
---
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
+++
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
@@ -163,17 +163,25 @@ public interface SegmentsMetadataManager
Set<String> retrieveAllDataSourceNames();
/**
- * Returns top N unused segment intervals with the start time no earlier
than the specified start time (if not null)
- * and with the end time no later than the specified maxEndTime and with
sed_status_last_updated time no later than
- * maxLastUsedTime when ordered by segment start time, end time. Any segment
having no used_status_last_updated time
- * due to upgrade from legacy Druid means maxUsedFlagLastUpdatedTime is
ignored for that segment.
+ * Returns a list of up to {@code limit} unused segment intervals for the
specified datasource. Segments are filtered based on the following criteria:
+ *
+ * <li> The start time of the segment must be no earlier than the specified
{@code minStartTime} (if not null). </li>
+ * <li> The end time of the segment must be no later than the specified
{@code maxEndTime}. </li>
+ * <li> The {@code used_status_last_updated} time of the segment must be no
later than {@code maxUsedStatusLastUpdatedTime}.
+ * Segments that have no {@code used_status_last_updated} time (due to
an upgrade from legacy Druid) will
+ * have {@code maxUsedStatusLastUpdatedTime} ignored. </li>
+ *
+ * <p>
+ * The list of intervals is ordered by segment start time and then by end
time.
+ * </p>
*/
List<Interval> getUnusedSegmentIntervals(
String dataSource,
DateTime minStartTime,
DateTime maxEndTime,
int limit,
- DateTime maxUsedFlagLastUpdatedTime);
+ DateTime maxUsedStatusLastUpdatedTime
+ );
@VisibleForTesting
void poll();
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
index 12d43ec5b76..0b423006152 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
@@ -687,7 +687,7 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
}
try (final CloseableIterator<DataSegment> iterator =
- queryTool.retrieveUnusedSegments(dataSourceName, intervals,
null, null, null)) {
+ queryTool.retrieveUnusedSegments(dataSourceName, intervals,
null, null, null, null)) {
while (iterator.hasNext()) {
final DataSegment dataSegment = iterator.next();
timeline.addSegments(Iterators.singletonIterator(dataSegment));
@@ -994,7 +994,7 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
? Intervals.ONLY_ETERNITY
: Collections.singletonList(interval);
try (final CloseableIterator<DataSegment> iterator =
- queryTool.retrieveUnusedSegments(datasource, intervals,
limit, lastSegmentId, sortOrder)) {
+ queryTool.retrieveUnusedSegments(datasource, intervals,
limit, lastSegmentId, sortOrder, null)) {
return ImmutableList.copyOf(iterator);
}
}
@@ -1138,7 +1138,7 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
@Nullable final DateTime minStartTime,
final DateTime maxEndTime,
final int limit,
- DateTime maxUsedFlagLastUpdatedTime
+ DateTime maxUsedStatusLastUpdatedTime
)
{
// Note that we handle the case where used_status_last_updated IS NULL
here to allow smooth transition to Druid version that uses
used_status_last_updated column
@@ -1162,7 +1162,7 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
.setMaxRows(limit)
.bind("dataSource", dataSource)
.bind("end", maxEndTime.toString())
- .bind("used_status_last_updated",
maxUsedFlagLastUpdatedTime.toString())
+ .bind("used_status_last_updated",
maxUsedStatusLastUpdatedTime.toString())
.map(
new BaseResultSetMapper<Interval>()
{
@@ -1182,7 +1182,6 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
Iterator<Interval> iter = sql.iterator();
-
List<Interval> result = Lists.newArrayListWithCapacity(limit);
for (int i = 0; i < limit && iter.hasNext(); i++) {
try {
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index 545acb84507..3bd7c48ad05 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.PreparedBatch;
@@ -117,7 +118,16 @@ public class SqlSegmentsMetadataQuery
final Collection<Interval> intervals
)
{
- return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS,
true, null, null, null);
+ return retrieveSegments(
+ dataSource,
+ intervals,
+ IntervalMode.OVERLAPS,
+ true,
+ null,
+ null,
+ null,
+ null
+ );
}
/**
@@ -135,6 +145,10 @@ public class SqlSegmentsMetadataQuery
* lexigraphically if sortOrder is DESC.
* @param sortOrder Specifies the order with which to return the
matching segments by start time, end time.
* A null value indicates that order does not matter.
+ * @param maxUsedStatusLastUpdatedTime The maximum {@code
used_status_last_updated} time. Any unused segment in {@code intervals}
+ * with {@code used_status_last_updated}
no later than this time will be included in the
+ * iterator. Segments without {@code
used_status_last_updated} time (due to an upgrade
+ * from legacy Druid) will have {@code
maxUsedStatusLastUpdatedTime} ignored
* Returns a closeable iterator. You should close it when you are done.
*/
@@ -143,10 +157,20 @@ public class SqlSegmentsMetadataQuery
final Collection<Interval> intervals,
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
- @Nullable final SortOrder sortOrder
+ @Nullable final SortOrder sortOrder,
+ @Nullable final DateTime maxUsedStatusLastUpdatedTime
)
{
- return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS,
false, limit, lastSegmentId, sortOrder);
+ return retrieveSegments(
+ dataSource,
+ intervals,
+ IntervalMode.CONTAINS,
+ false,
+ limit,
+ lastSegmentId,
+ sortOrder,
+ maxUsedStatusLastUpdatedTime
+ );
}
/**
@@ -241,6 +265,7 @@ public class SqlSegmentsMetadataQuery
true,
null,
null,
+ null,
null
),
DataSegment::getId
@@ -379,12 +404,13 @@ public class SqlSegmentsMetadataQuery
final boolean used,
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
- @Nullable final SortOrder sortOrder
+ @Nullable final SortOrder sortOrder,
+ @Nullable final DateTime maxUsedStatusLastUpdatedTime
)
{
if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) {
return CloseableIterators.withEmptyBaggage(
- retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode,
used, limit, lastSegmentId, sortOrder)
+ retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode,
used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)
);
} else {
final List<List<Interval>> intervalsLists = Lists.partition(new
ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH);
@@ -399,7 +425,8 @@ public class SqlSegmentsMetadataQuery
used,
limitPerBatch,
lastSegmentId,
- sortOrder
+ sortOrder,
+ maxUsedStatusLastUpdatedTime
);
if (limitPerBatch != null) {
// If limit is provided, we need to shrink the limit for subsequent
batches or circuit break if
@@ -425,7 +452,8 @@ public class SqlSegmentsMetadataQuery
final boolean used,
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
- @Nullable final SortOrder sortOrder
+ @Nullable final SortOrder sortOrder,
+ @Nullable final DateTime maxUsedStatusLastUpdatedTime
)
{
// Check if the intervals all support comparing as strings. If so, bake
them into the SQL.
@@ -438,6 +466,12 @@ public class SqlSegmentsMetadataQuery
appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode,
connector);
}
+ // Add the used_status_last_updated time filter only for unused segments
when maxUsedStatusLastUpdatedTime is non-null.
+ final boolean addMaxUsedLastUpdatedTimeFilter = !used &&
maxUsedStatusLastUpdatedTime != null;
+ if (addMaxUsedLastUpdatedTimeFilter) {
+ sb.append(" AND (used_status_last_updated IS NOT NULL AND
used_status_last_updated <= :used_status_last_updated)");
+ }
+
if (lastSegmentId != null) {
sb.append(
StringUtils.format(
@@ -462,10 +496,16 @@ public class SqlSegmentsMetadataQuery
.setFetchSize(connector.getStreamingFetchSize())
.bind("used", used)
.bind("dataSource", dataSource);
+
+ if (addMaxUsedLastUpdatedTimeFilter) {
+ sql.bind("used_status_last_updated",
maxUsedStatusLastUpdatedTime.toString());
+ }
+
if (lastSegmentId != null) {
sql.bind("id", lastSegmentId);
}
- if (null != limit) {
+
+ if (limit != null) {
sql.setMaxRows(limit);
}
diff --git
a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
index 6a8e515b327..422803492d8 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
@@ -34,6 +34,7 @@ import
org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.ServiceRetryPolicy;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -73,33 +74,20 @@ public interface OverlordClient
/**
* Run a "kill" task for a particular datasource and interval. Shortcut to
{@link #runTask(String, Object)}.
- *
- * The kill task deletes all unused segment records from deep storage and
the metadata store. The task runs
- * asynchronously after the API call returns. The resolved future is the ID
of the task, which can be used to
- * monitor its progress through the {@link #taskStatus(String)} API.
- *
- * @param idPrefix Descriptive prefix to include at the start of task IDs
- * @param dataSource Datasource to kill
- * @param interval Interval to kill
- *
- * @return future with task ID
- */
- default ListenableFuture<String> runKillTask(String idPrefix, String
dataSource, Interval interval)
- {
- return runKillTask(idPrefix, dataSource, interval, null);
- }
-
- /**
- * Run a "kill" task for a particular datasource and interval. Shortcut to
{@link #runTask(String, Object)}.
- *
* The kill task deletes all unused segment records from deep storage and
the metadata store. The task runs
* asynchronously after the API call returns. The resolved future is the ID
of the task, which can be used to
* monitor its progress through the {@link #taskStatus(String)} API.
*
* @param idPrefix Descriptive prefix to include at the start of task IDs
* @param dataSource Datasource to kill
- * @param interval Interval to kill
+ * @param interval Umbrella interval to be considered by the kill task.
Note that unused segments falling in this
+ * widened umbrella interval may have different {@code
used_status_last_updated} time, so the kill task
+ * should also filter by {@code
maxUsedStatusLastUpdatedTime}
* @param maxSegmentsToKill The maximum number of segments to kill
+ * @param maxUsedStatusLastUpdatedTime The maximum {@code
used_status_last_updated} time. Any unused segment in {@code interval}
+ * with {@code used_status_last_updated}
no later than this time will be included in the
+ * kill task. Segments without {@code
used_status_last_updated} time (due to an upgrade
+ * from legacy Druid) will have {@code
maxUsedStatusLastUpdatedTime} ignored
*
* @return future with task ID
*/
@@ -107,7 +95,8 @@ public interface OverlordClient
String idPrefix,
String dataSource,
Interval interval,
- @Nullable Integer maxSegmentsToKill
+ @Nullable Integer maxSegmentsToKill,
+ @Nullable DateTime maxUsedStatusLastUpdatedTime
)
{
final String taskId = IdUtils.newTaskId(idPrefix,
ClientKillUnusedSegmentsTaskQuery.TYPE, dataSource, interval);
@@ -117,7 +106,8 @@ public interface OverlordClient
interval,
false,
null,
- maxSegmentsToKill
+ maxSegmentsToKill,
+ maxUsedStatusLastUpdatedTime
);
return FutureUtils.transform(runTask(taskId, taskQuery), ignored ->
taskId);
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index da7847f4505..fb166362345 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -20,10 +20,10 @@
package org.apache.druid.server.coordinator.duty;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils;
@@ -37,6 +37,7 @@ import
org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
+import org.joda.time.Duration;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -46,13 +47,19 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
+ * <p>
* Completely removes information about unused segments who have an interval
end that comes before
- * now - {@link #retainDuration} from the metadata store. retainDuration can
be a positive or negative duration,
- * negative meaning the interval end target will be in the future. Also,
retainDuration can be ignored,
- * meaning that there is no upper bound to the end interval of segments that
will be killed. This action is called
- * "to kill a segment".
+ * now - {@link #durationToRetain} from the metadata store. {@link
#durationToRetain} can be a positive or negative duration,
+ * negative meaning the interval end target will be in the future. Also,
{@link #durationToRetain} can be ignored if
+ * {@link #ignoreDurationToRetain} is enabled, meaning that there is no upper
bound to the end interval of segments that
+ * will be killed. The umbrella interval of the unused segments per datasource
to be killed is determined by
+ * {@link #findIntervalForKill(String, DateTime)}, which takes into account
the configured {@link #bufferPeriod}. However,
+ * the kill task needs to check again for max {@link #bufferPeriod} for the
unused segments in the widened interval
+ * as there can be multiple unused segments with different {@code
used_status_last_updated} time.
+ * </p>
* <p>
- * See org.apache.druid.indexing.common.task.KillUnusedSegmentsTask.
+ * See {@link org.apache.druid.indexing.common.task.KillUnusedSegmentsTask}.
+ * </p>
*/
public class KillUnusedSegments implements CoordinatorDuty
{
@@ -63,9 +70,9 @@ public class KillUnusedSegments implements CoordinatorDuty
&& (KILL_TASK_TYPE.equals(status.getType()) &&
status.getId().startsWith(TASK_ID_PREFIX));
private static final Logger log = new Logger(KillUnusedSegments.class);
- private final long period;
- private final long retainDuration;
- private final boolean ignoreRetainDuration;
+ private final Duration period;
+ private final Duration durationToRetain;
+ private final boolean ignoreDurationToRetain;
private final int maxSegmentsToKill;
/**
@@ -73,7 +80,7 @@ public class KillUnusedSegments implements CoordinatorDuty
* datasource.
*/
private final Map<String, DateTime> datasourceToLastKillIntervalEnd;
- private long lastKillTime = 0;
+ private DateTime lastKillTime;
private final long bufferPeriod;
private final SegmentsMetadataManager segmentsMetadataManager;
@@ -85,32 +92,37 @@ public class KillUnusedSegments implements CoordinatorDuty
DruidCoordinatorConfig config
)
{
- this.period = config.getCoordinatorKillPeriod().getMillis();
- Preconditions.checkArgument(
- this.period >= config.getCoordinatorIndexingPeriod().getMillis(),
- "coordinator kill period must be greater than or equal to
druid.coordinator.period.indexingPeriod"
- );
-
- this.ignoreRetainDuration =
config.getCoordinatorKillIgnoreDurationToRetain();
- this.retainDuration =
config.getCoordinatorKillDurationToRetain().getMillis();
- if (this.ignoreRetainDuration) {
+ if (config.getCoordinatorKillPeriod().getMillis() <
config.getCoordinatorIndexingPeriod().getMillis()) {
+ throw InvalidInput.exception(
+ "druid.coordinator.kill.period[%s] must be >=
druid.coordinator.period.indexingPeriod[%s]",
+ config.getCoordinatorKillPeriod(),
+ config.getCoordinatorIndexingPeriod()
+ );
+ }
+ if (config.getCoordinatorKillMaxSegments() < 0) {
+ throw InvalidInput.exception(
+ "druid.coordinator.kill.maxSegments[%s] is invalid. It must be a
positive integer.",
+ config.getCoordinatorKillMaxSegments()
+ );
+ }
+ this.period = config.getCoordinatorKillPeriod();
+ this.ignoreDurationToRetain =
config.getCoordinatorKillIgnoreDurationToRetain();
+ this.durationToRetain = config.getCoordinatorKillDurationToRetain();
+ if (this.ignoreDurationToRetain) {
log.debug(
- "druid.coordinator.kill.durationToRetain [%s] will be ignored when
discovering segments to kill "
- + "because you have set
druid.coordinator.kill.ignoreDurationToRetain to True.",
- this.retainDuration
+ "druid.coordinator.kill.durationToRetain[%s] will be ignored when
discovering segments to kill "
+ + "because druid.coordinator.kill.ignoreDurationToRetain is set to
true.",
+ this.durationToRetain
);
}
this.bufferPeriod = config.getCoordinatorKillBufferPeriod().getMillis();
-
this.maxSegmentsToKill = config.getCoordinatorKillMaxSegments();
- Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill
maxSegments must be > 0");
-
datasourceToLastKillIntervalEnd = new ConcurrentHashMap<>();
log.info(
- "Kill Task scheduling enabled with period [%s], retainDuration [%s],
bufferPeriod [%s], maxSegmentsToKill [%s]",
+ "Kill task scheduling enabled with period[%s], durationToRetain[%s],
bufferPeriod[%s], maxSegmentsToKill[%s]",
this.period,
- this.ignoreRetainDuration ? "IGNORING" : this.retainDuration,
+ this.ignoreDurationToRetain ? "IGNORING" : this.durationToRetain,
this.bufferPeriod,
this.maxSegmentsToKill
);
@@ -122,9 +134,12 @@ public class KillUnusedSegments implements CoordinatorDuty
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
{
- final long currentTimeMillis = System.currentTimeMillis();
- if (lastKillTime + period > currentTimeMillis) {
- log.debug("Skipping kill of unused segments as kill period has not
elapsed yet.");
+ if (!canDutyRun()) {
+ log.debug(
+ "Skipping KillUnusedSegments until period[%s] has elapsed after
lastKillTime[%s].",
+ period,
+ lastKillTime
+ );
return params;
}
@@ -159,10 +174,9 @@ public class KillUnusedSegments implements CoordinatorDuty
dataSourcesToKill =
segmentsMetadataManager.retrieveAllDataSourceNames();
}
- log.debug("Killing unused segments in datasources: %s",
dataSourcesToKill);
- lastKillTime = System.currentTimeMillis();
+ log.debug("Killing unused segments for datasources[%s]",
dataSourcesToKill);
+ lastKillTime = DateTimes.nowUtc();
taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill,
availableKillTaskSlots);
-
}
// any datasources that are no longer being considered for kill should
have their
@@ -196,26 +210,31 @@ public class KillUnusedSegments implements CoordinatorDuty
+ "on the next coordinator cycle.", submittedTasks,
availableKillTaskSlots));
break;
}
- final Interval intervalToKill = findIntervalForKill(dataSource);
+ final DateTime maxUsedStatusLastUpdatedTime =
DateTimes.nowUtc().minus(bufferPeriod);
+ final Interval intervalToKill = findIntervalForKill(dataSource,
maxUsedStatusLastUpdatedTime);
if (intervalToKill == null) {
datasourceToLastKillIntervalEnd.remove(dataSource);
continue;
}
try {
- FutureUtils.getUnchecked(overlordClient.runKillTask(
- TASK_ID_PREFIX,
- dataSource,
- intervalToKill,
- maxSegmentsToKill
- ), true);
+ FutureUtils.getUnchecked(
+ overlordClient.runKillTask(
+ TASK_ID_PREFIX,
+ dataSource,
+ intervalToKill,
+ maxSegmentsToKill,
+ maxUsedStatusLastUpdatedTime
+ ),
+ true
+ );
++submittedTasks;
datasourceToLastKillIntervalEnd.put(dataSource,
intervalToKill.getEnd());
}
catch (Exception ex) {
- log.error(ex, "Failed to submit kill task for dataSource [%s]",
dataSource);
+ log.error(ex, "Failed to submit kill task for dataSource[%s] in
interval[%s]", dataSource, intervalToKill);
if (Thread.currentThread().isInterrupted()) {
- log.warn("skipping kill task scheduling because thread is
interrupted.");
+ log.warn("Skipping kill task scheduling because thread is
interrupted.");
break;
}
}
@@ -244,14 +263,13 @@ public class KillUnusedSegments implements CoordinatorDuty
* Calculates the interval for which segments are to be killed in a
datasource.
*/
@Nullable
- private Interval findIntervalForKill(String dataSource)
+ private Interval findIntervalForKill(String dataSource, DateTime
maxUsedStatusLastUpdatedTime)
{
- final DateTime maxEndTime = ignoreRetainDuration
+ final DateTime maxEndTime = ignoreDurationToRetain
? DateTimes.COMPARE_DATE_AS_STRING_MAX
- : DateTimes.nowUtc().minus(retainDuration);
-
+ : DateTimes.nowUtc().minus(durationToRetain);
List<Interval> unusedSegmentIntervals = segmentsMetadataManager
- .getUnusedSegmentIntervals(dataSource,
datasourceToLastKillIntervalEnd.get(dataSource), maxEndTime, maxSegmentsToKill,
DateTimes.nowUtc().minus(bufferPeriod));
+ .getUnusedSegmentIntervals(dataSource,
datasourceToLastKillIntervalEnd.get(dataSource), maxEndTime, maxSegmentsToKill,
maxUsedStatusLastUpdatedTime);
if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) {
return null;
@@ -270,6 +288,11 @@ public class KillUnusedSegments implements CoordinatorDuty
);
}
+ private boolean canDutyRun()
+ {
+ return lastKillTime == null ||
!DateTimes.nowUtc().isBefore(lastKillTime.plus(period));
+ }
+
@VisibleForTesting
static int getKillTaskCapacity(int totalWorkerCapacity, double
killTaskSlotRatio, int maxKillTaskSlots)
{
diff --git
a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index a539a48ecb7..1546544029e 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -380,7 +380,7 @@ public class DataSourcesResource
final Interval theInterval = Intervals.of(interval.replace('_', '/'));
try {
final String killTaskId = FutureUtils.getUnchecked(
- overlordClient.runKillTask("api-issued", dataSourceName,
theInterval, null),
+ overlordClient.runKillTask("api-issued", dataSourceName,
theInterval, null, null),
true
);
auditManager.doAudit(
diff --git
a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
index 60edff93077..0059d048e06 100644
---
a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
+++
b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
@@ -48,7 +48,8 @@ public class ClientKillUnusedSegmentsTaskQueryTest
INTERVAL,
true,
BATCH_SIZE,
- LIMIT
+ LIMIT,
+ null
);
}
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 4c3534feacd..0626792f1fd 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -372,10 +372,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
private void markAllSegmentsUnused()
{
- markAllSegmentsUnused(SEGMENTS);
+ markAllSegmentsUnused(SEGMENTS, DateTimes.nowUtc());
}
- private void markAllSegmentsUnused(Set<DataSegment> segments)
+ private void markAllSegmentsUnused(Set<DataSegment> segments, DateTime
usedStatusLastUpdatedTime)
{
for (final DataSegment segment : segments) {
Assert.assertEquals(
@@ -386,7 +386,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
"UPDATE %s SET used = false, used_status_last_updated =
:used_status_last_updated WHERE id = :id",
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable()
);
- return handle.createStatement(request).bind("id",
segment.getId().toString()).bind("used_status_last_updated",
DateTimes.nowUtc().toString()).execute();
+ return handle.createStatement(request)
+ .bind("id", segment.getId().toString())
+ .bind("used_status_last_updated",
usedStatusLastUpdatedTime.toString()
+ ).execute();
}
)
);
@@ -977,7 +980,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
public void testRetrieveSegmentForId()
{
insertUsedSegments(ImmutableSet.of(defaultSegment));
- markAllSegmentsUnused(ImmutableSet.of(defaultSegment));
+ markAllSegmentsUnused(ImmutableSet.of(defaultSegment), DateTimes.nowUtc());
Assert.assertEquals(defaultSegment,
coordinator.retrieveSegmentForId(defaultSegment.getId().toString(), true));
}
@@ -1147,11 +1150,12 @@ public class IndexerSQLMetadataStorageCoordinatorTest
public void testRetrieveUnusedSegmentsUsingSingleIntervalAndNoLimit() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
- markAllSegmentsUnused(new HashSet<>(segments));
+ markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
final List<DataSegment> actualUnusedSegments =
coordinator.retrieveUnusedSegmentsForInterval(
DS.WIKI,
Intervals.of("1900/3000"),
+ null,
null
);
@@ -1163,13 +1167,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitAtRange()
throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
- markAllSegmentsUnused(new HashSet<>(segments));
+ markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
final int requestedLimit = segments.size();
final List<DataSegment> actualUnusedSegments =
coordinator.retrieveUnusedSegmentsForInterval(
DS.WIKI,
Intervals.of("1900/3000"),
- requestedLimit
+ requestedLimit,
+ null
);
Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
@@ -1180,13 +1185,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitInRange()
throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
- markAllSegmentsUnused(new HashSet<>(segments));
+ markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
final int requestedLimit = segments.size() - 1;
final List<DataSegment> actualUnusedSegments =
coordinator.retrieveUnusedSegmentsForInterval(
DS.WIKI,
Intervals.of("1900/3000"),
- requestedLimit
+ requestedLimit,
+ null
);
Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
@@ -1197,13 +1203,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
public void
testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitOutOfRange() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
- markAllSegmentsUnused(new HashSet<>(segments));
+ markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
final int limit = segments.size() + 1;
final List<DataSegment> actualUnusedSegments =
coordinator.retrieveUnusedSegmentsForInterval(
DS.WIKI,
Intervals.of("1900/3000"),
- limit
+ limit,
+ null
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(actualUnusedSegments.containsAll(segments));
@@ -1213,7 +1220,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
public void testRetrieveUnusedSegmentsUsingSingleIntervalOutOfRange() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1905,
1910);
- markAllSegmentsUnused(new HashSet<>(segments));
+ markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
final Interval outOfRangeInterval = Intervals.of("1700/1800");
Assert.assertTrue(segments.stream()
@@ -1223,7 +1230,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
final List<DataSegment> actualUnusedSegments =
coordinator.retrieveUnusedSegmentsForInterval(
DS.WIKI,
outOfRangeInterval,
- limit
+ limit,
+ null
);
Assert.assertEquals(0, actualUnusedSegments.size());
}
@@ -1232,12 +1240,13 @@ public class IndexerSQLMetadataStorageCoordinatorTest
public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndNoLimit()
throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
- markAllSegmentsUnused(new HashSet<>(segments));
+ markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
null,
null,
+ null,
null
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
@@ -1248,12 +1257,13 @@ public class IndexerSQLMetadataStorageCoordinatorTest
public void
testRetrieveUnusedSegmentsUsingNoIntervalsNoLimitAndNoLastSegmentId() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
- markAllSegmentsUnused(new HashSet<>(segments));
+ markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
ImmutableList.of(),
null,
null,
+ null,
null
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
@@ -1264,7 +1274,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
public void
testRetrieveUnusedSegmentsUsingNoIntervalsAndNoLimitAndNoLastSegmentId() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(2033,
2133);
- markAllSegmentsUnused(new HashSet<>(segments));
+ markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
String lastSegmentId = segments.get(9).getId().toString();
final List<DataSegment> expectedSegmentsAscOrder = segments.stream()
@@ -1274,6 +1284,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
ImmutableList.of(),
null,
lastSegmentId,
+ null,
null
);
Assert.assertEquals(expectedSegmentsAscOrder.size(),
actualUnusedSegments.size());
@@ -1283,7 +1294,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
ImmutableList.of(),
null,
lastSegmentId,
- SortOrder.ASC
+ SortOrder.ASC,
+ null
);
Assert.assertEquals(expectedSegmentsAscOrder.size(),
actualUnusedSegments.size());
Assert.assertEquals(expectedSegmentsAscOrder, actualUnusedSegments);
@@ -1297,7 +1309,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
ImmutableList.of(),
null,
lastSegmentId,
- SortOrder.DESC
+ SortOrder.DESC,
+ null
);
Assert.assertEquals(expectedSegmentsDescOrder.size(),
actualUnusedSegments.size());
Assert.assertEquals(expectedSegmentsDescOrder, actualUnusedSegments);
@@ -1307,12 +1320,13 @@ public class IndexerSQLMetadataStorageCoordinatorTest
public void
testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitAtRange() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
- markAllSegmentsUnused(new HashSet<>(segments));
+ markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
segments.size(),
null,
+ null,
null
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
@@ -1323,13 +1337,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest
public void
testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitInRange() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
- markAllSegmentsUnused(new HashSet<>(segments));
+ markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
final int requestedLimit = segments.size() - 1;
final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
requestedLimit,
null,
+ null,
null
);
Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
@@ -1340,7 +1355,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
public void
testRetrieveUnusedSegmentsUsingMultipleIntervalsInSingleBatchLimitAndLastSegmentId()
throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(2034,
2133);
- markAllSegmentsUnused(new HashSet<>(segments));
+ markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
final int requestedLimit = segments.size();
final String lastSegmentId = segments.get(4).getId().toString();
@@ -1348,6 +1363,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
requestedLimit,
lastSegmentId,
+ null,
null
);
Assert.assertEquals(segments.size() - 5, actualUnusedSegments.size());
@@ -1361,7 +1377,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
public void
testRetrieveUnusedSegmentsUsingMultipleIntervalsLimitAndLastSegmentId() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
- markAllSegmentsUnused(new HashSet<>(segments));
+ markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
final int requestedLimit = segments.size() - 1;
final String lastSegmentId = segments.get(4).getId().toString();
@@ -1369,6 +1385,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
requestedLimit,
lastSegmentId,
+ null,
null
);
Assert.assertEquals(requestedLimit - 4, actualUnusedSegments.size());
@@ -1382,12 +1399,13 @@ public class IndexerSQLMetadataStorageCoordinatorTest
public void testRetrieveUnusedSegmentsUsingMultipleIntervals() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
2133);
- markAllSegmentsUnused(new HashSet<>(segments));
+ markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
final ImmutableList<DataSegment> actualUnusedSegments =
retrieveUnusedSegments(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
segments.size() + 1,
null,
+ null,
null
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
@@ -1398,7 +1416,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
public void testRetrieveUnusedSegmentsUsingIntervalOutOfRange() throws
IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1905,
1910);
- markAllSegmentsUnused(new HashSet<>(segments));
+ markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
final Interval outOfRangeInterval = Intervals.of("1700/1800");
Assert.assertTrue(segments.stream()
@@ -1408,11 +1426,82 @@ public class IndexerSQLMetadataStorageCoordinatorTest
ImmutableList.of(outOfRangeInterval),
null,
null,
- null
+ null,
+ null
);
Assert.assertEquals(0, actualUnusedSegments.size());
}
+ @Test
+ public void testRetrieveUnusedSegmentsWithMaxUsedStatusLastUpdatedTime()
throws IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(1905,
1910);
+ markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc());
+
+ final Interval interval = Intervals.of("1905/1920");
+
+ final ImmutableList<DataSegment> actualUnusedSegments1 =
retrieveUnusedSegments(
+ ImmutableList.of(interval),
+ null,
+ null,
+ null,
+ DateTimes.nowUtc()
+ );
+ Assert.assertEquals(5, actualUnusedSegments1.size());
+
+ final ImmutableList<DataSegment> actualUnusedSegments2 =
retrieveUnusedSegments(
+ ImmutableList.of(interval),
+ null,
+ null,
+ null,
+ DateTimes.nowUtc().minusHours(1)
+ );
+ Assert.assertEquals(0, actualUnusedSegments2.size());
+ }
+
+ @Test
+ public void testRetrieveUnusedSegmentsWithMaxUsedStatusLastUpdatedTime2()
throws IOException
+ {
+ final List<DataSegment> segments = createAndGetUsedYearSegments(1900,
1950);
+ final List<DataSegment> evenYearSegments = new ArrayList<>();
+ final List<DataSegment> oddYearSegments = new ArrayList<>();
+
+ for (int i = 0; i < segments.size(); i++) {
+ DataSegment dataSegment = segments.get(i);
+ if (i % 2 == 0) {
+ evenYearSegments.add(dataSegment);
+ } else {
+ oddYearSegments.add(dataSegment);
+ }
+ }
+
+ final DateTime maxUsedStatusLastUpdatedTime1 = DateTimes.nowUtc();
+ markAllSegmentsUnused(new HashSet<>(oddYearSegments),
maxUsedStatusLastUpdatedTime1);
+
+ final DateTime maxUsedStatusLastUpdatedTime2 = DateTimes.nowUtc();
+ markAllSegmentsUnused(new HashSet<>(evenYearSegments),
maxUsedStatusLastUpdatedTime2);
+
+ final Interval interval = Intervals.of("1900/1950");
+
+ final ImmutableList<DataSegment> actualUnusedSegments1 =
retrieveUnusedSegments(
+ ImmutableList.of(interval),
+ null,
+ null,
+ null,
+ maxUsedStatusLastUpdatedTime1
+ );
+ Assert.assertEquals(oddYearSegments.size(), actualUnusedSegments1.size());
+
+ final ImmutableList<DataSegment> actualUnusedSegments2 =
retrieveUnusedSegments(
+ ImmutableList.of(interval),
+ null,
+ null,
+ null,
+ maxUsedStatusLastUpdatedTime2
+ );
+ Assert.assertEquals(segments.size(), actualUnusedSegments2.size());
+ }
+
@Test
public void testSimpleUnusedList() throws IOException
{
@@ -1423,7 +1512,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
ImmutableSet.copyOf(
coordinator.retrieveUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
- defaultSegment.getInterval()
+ defaultSegment.getInterval(),
+ null,
+ null
)
)
);
@@ -1439,7 +1530,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
coordinator.retrieveUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
defaultSegment.getInterval(),
- limit
+ limit,
+ null
)
);
Assert.assertEquals(limit, retreivedUnusedSegments.size());
@@ -1551,7 +1643,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
new Interval(
defaultSegment.getInterval().getStart().minus(1),
defaultSegment.getInterval().getStart().plus(1)
- )
+ ),
+ null,
+ null
).isEmpty()
);
}
@@ -1564,7 +1658,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertTrue(
coordinator.retrieveUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
- new Interval(defaultSegment.getInterval().getStart().plus(1),
defaultSegment.getInterval().getEnd())
+ new Interval(defaultSegment.getInterval().getStart().plus(1),
defaultSegment.getInterval().getEnd()),
+ null,
+ null
).isEmpty()
);
}
@@ -1578,7 +1674,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertTrue(
coordinator.retrieveUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
- new Interval(defaultSegment.getInterval().getStart(),
defaultSegment.getInterval().getEnd().minus(1))
+ new Interval(defaultSegment.getInterval().getStart(),
defaultSegment.getInterval().getEnd().minus(1)),
+ null,
+ null
).isEmpty()
);
}
@@ -1591,7 +1689,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertTrue(
coordinator.retrieveUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
-
defaultSegment.getInterval().withStart(defaultSegment.getInterval().getEnd().minus(1))
+
defaultSegment.getInterval().withStart(defaultSegment.getInterval().getEnd().minus(1)),
+ null,
+ null
).isEmpty()
);
}
@@ -1606,7 +1706,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
ImmutableSet.copyOf(
coordinator.retrieveUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
- Intervals.of("2000/2999")
+ Intervals.of("2000/2999"),
+ null,
+ null
)
)
);
@@ -1622,7 +1724,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
ImmutableSet.copyOf(
coordinator.retrieveUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
-
defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minus(1))
+
defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minus(1)),
+ null,
+ null
)
)
);
@@ -1631,7 +1735,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
ImmutableSet.copyOf(
coordinator.retrieveUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
-
defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minusYears(1))
+
defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minusYears(1)),
+ null,
+ null
)
)
);
@@ -1647,7 +1753,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
ImmutableSet.copyOf(
coordinator.retrieveUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
-
defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plus(1))
+
defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plus(1)),
+ null,
+ null
)
)
);
@@ -1656,7 +1764,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
ImmutableSet.copyOf(
coordinator.retrieveUnusedSegmentsForInterval(
defaultSegment.getDataSource(),
-
defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusYears(1))
+
defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusYears(1)),
+ null,
+ null
)
)
);
@@ -2189,7 +2299,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals(1, identifier3.getShardSpec().getNumCorePartitions());
// now drop the used segment previously loaded:
- markAllSegmentsUnused(ImmutableSet.of(segment));
+ markAllSegmentsUnused(ImmutableSet.of(segment), DateTimes.nowUtc());
// and final load, this reproduces an issue that could happen with
multiple streaming appends,
// followed by a reindex, followed by a drop, and more streaming data
coming in for same interval
@@ -2208,7 +2318,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
}
/**
- * Slightly different that the above test but that involves reverted
compaction
+ * Slightly different from the above test that involves reverted compaction
* 1) used segments of version = A, id = 0, 1, 2
* 2) overwrote segments of version = B, id = 0 <= compaction
* 3) marked segments unused for version = A, id = 0, 1, 2 <= overshadowing
@@ -2354,7 +2464,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
// 5) reverted compaction (by marking B_0 as unused)
// Revert compaction a manual metadata update which is basically the
following two steps:
- markAllSegmentsUnused(ImmutableSet.of(compactedSegment)); // <- drop
compacted segment
+ markAllSegmentsUnused(ImmutableSet.of(compactedSegment),
DateTimes.nowUtc()); // <- drop compacted segment
// pending: version = A, id = 0,1,2
// version = B, id = 1
//
@@ -2896,7 +3006,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
ImmutableSet.copyOf(
coordinator.retrieveUnusedSegmentsForInterval(
existingSegment1.getDataSource(),
-
existingSegment1.getInterval().withEnd(existingSegment1.getInterval().getEnd().plus(1))
+
existingSegment1.getInterval().withEnd(existingSegment1.getInterval().getEnd().plus(1)),
+ null,
+ null
)
)
);
@@ -2905,7 +3017,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
ImmutableSet.copyOf(
coordinator.retrieveUnusedSegmentsForInterval(
existingSegment2.getDataSource(),
-
existingSegment2.getInterval().withEnd(existingSegment2.getInterval().getEnd().plusYears(1))
+
existingSegment2.getInterval().withEnd(existingSegment2.getInterval().getEnd().plusYears(1)),
+ null,
+ null
)
)
);
@@ -2928,7 +3042,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
ImmutableSet.copyOf(
coordinator.retrieveUnusedSegmentsForInterval(
existingSegment1.getDataSource(),
-
existingSegment1.getInterval().withEnd(existingSegment1.getInterval().getEnd().plus(1))
+
existingSegment1.getInterval().withEnd(existingSegment1.getInterval().getEnd().plus(1)),
+ null,
+ null
)
)
);
@@ -2937,7 +3053,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
ImmutableSet.copyOf(
coordinator.retrieveUnusedSegmentsForInterval(
existingSegment2.getDataSource(),
-
existingSegment2.getInterval().withEnd(existingSegment2.getInterval().getEnd().plusYears(1))
+
existingSegment2.getInterval().withEnd(existingSegment2.getInterval().getEnd().plusYears(1)),
+ null,
+ null
)
)
);
@@ -3100,7 +3218,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertTrue(coordinator.commitSegments(dataSegments).containsAll(dataSegments));
// Mark the tombstone as unused
- markAllSegmentsUnused(tombstones);
+ markAllSegmentsUnused(tombstones, DateTimes.nowUtc());
final Collection<DataSegment> allUsedSegments =
coordinator.retrieveAllUsedSegments(
DS.WIKI,
@@ -3154,7 +3272,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertTrue(coordinator.commitSegments(dataSegments).containsAll(dataSegments));
// Mark the tombstone as unused
- markAllSegmentsUnused(tombstones);
+ markAllSegmentsUnused(tombstones, DateTimes.nowUtc());
final Collection<DataSegment> allUsedSegments =
coordinator.retrieveAllUsedSegments(
DS.WIKI,
@@ -3205,7 +3323,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
final List<Interval> intervals,
final Integer limit,
final String lastSegmentId,
- final SortOrder sortOrder
+ final SortOrder sortOrder,
+ final DateTime maxUsedStatusLastUpdatedTime
)
{
return derbyConnector.inReadOnlyTransaction(
@@ -3217,7 +3336,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
derbyConnectorRule.metadataTablesConfigSupplier().get(),
mapper
)
- .retrieveUnusedSegments(DS.WIKI, intervals, limit,
lastSegmentId, sortOrder)) {
+ .retrieveUnusedSegments(DS.WIKI, intervals, limit,
lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)) {
return ImmutableList.copyOf(iterator);
}
}
diff --git
a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
index 7ba5916a771..cad3f21f06a 100644
---
a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
+++
b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
@@ -433,6 +433,7 @@ public class OverlordClientImplTest
null,
null,
null,
+ null,
null
);
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index 5adb345c9fa..a174b9e2264 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -60,6 +60,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
/**
@@ -68,10 +69,10 @@ import static org.mockito.ArgumentMatchers.anyString;
@RunWith(MockitoJUnitRunner.class)
public class KillUnusedSegmentsTest
{
- private static final int MAX_SEGMENTS_TO_KILL = 10;
- private static final Duration COORDINATOR_KILL_PERIOD =
Duration.standardMinutes(2);
+ private static final Duration INDEXING_PERIOD = Duration.standardSeconds(1);
+ private static final Duration COORDINATOR_KILL_PERIOD =
Duration.standardSeconds(1);
private static final Duration DURATION_TO_RETAIN = Duration.standardDays(1);
- private static final Duration INDEXING_PERIOD = Duration.standardMinutes(1);
+ private static final int MAX_SEGMENTS_TO_KILL = 10;
private static final String DATASOURCE = "DS1";
@Mock
@@ -174,7 +175,7 @@ public class KillUnusedSegmentsTest
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
target.run(params);
Mockito.verify(overlordClient, Mockito.never())
- .runKillTask(anyString(), anyString(), any(Interval.class));
+ .runKillTask(anyString(), anyString(), any(Interval.class),
anyInt(), any(DateTime.class));
}
@Test
@@ -192,7 +193,7 @@ public class KillUnusedSegmentsTest
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
target.run(params);
Mockito.verify(overlordClient, Mockito.never())
- .runKillTask(anyString(), anyString(), any(Interval.class));
+ .runKillTask(anyString(), anyString(), any(Interval.class),
anyInt(), any(DateTime.class));
}
@Test
@@ -364,48 +365,24 @@ public class KillUnusedSegmentsTest
{
int limit = config.getCoordinatorKillMaxSegments();
Mockito.doReturn(Futures.immediateFuture("ok"))
- .when(overlordClient)
- .runKillTask(
- ArgumentMatchers.anyString(),
- ArgumentMatchers.anyString(),
- ArgumentMatchers.any(Interval.class),
- ArgumentMatchers.anyInt());
+ .when(overlordClient)
+ .runKillTask(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.any(Interval.class),
+ ArgumentMatchers.anyInt(),
+ ArgumentMatchers.any(DateTime.class));
target.runInternal(params);
Mockito.verify(overlordClient, Mockito.times(1)).runKillTask(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq(DATASOURCE),
ArgumentMatchers.eq(expectedKillInterval),
- ArgumentMatchers.eq(limit)
+ ArgumentMatchers.eq(limit),
+ ArgumentMatchers.any()
);
}
- private void runAndVerifyKillIntervals(List<Interval> expectedKillIntervals)
- {
- int limit = config.getCoordinatorKillMaxSegments();
- Mockito.doReturn(Futures.immediateFuture("ok"))
- .when(overlordClient)
- .runKillTask(
- ArgumentMatchers.anyString(),
- ArgumentMatchers.anyString(),
- ArgumentMatchers.any(Interval.class),
- ArgumentMatchers.anyInt());
- for (int i = 0; i < expectedKillIntervals.size(); i++) {
- target.run(params);
- verifyState(ImmutableMap.of(DATASOURCE,
yearOldSegment.getInterval().getEnd()));
- verifyStats(9, 1, 10);
- }
-
- for (Interval expectedKillInterval : expectedKillIntervals) {
- Mockito.verify(overlordClient, Mockito.times(1)).runKillTask(
- ArgumentMatchers.anyString(),
- ArgumentMatchers.eq(DATASOURCE),
- ArgumentMatchers.eq(expectedKillInterval),
- ArgumentMatchers.eq(limit)
- );
- }
- }
-
private void verifyStats(int availableSlots, int submittedTasks, int
maxSlots)
{
verifyStats(availableSlots, submittedTasks, maxSlots, 1);
@@ -430,7 +407,8 @@ public class KillUnusedSegmentsTest
ArgumentMatchers.anyString(),
ArgumentMatchers.anyString(),
ArgumentMatchers.any(Interval.class),
- ArgumentMatchers.anyInt()
+ ArgumentMatchers.anyInt(),
+ ArgumentMatchers.any(DateTime.class)
);
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
index cfb8fec941e..1e2f2d462c4 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
@@ -217,7 +217,7 @@ public class TestSegmentsMetadataManager implements
SegmentsMetadataManager
@Nullable final DateTime minStartTime,
final DateTime maxEndTime,
final int limit,
- final DateTime maxUsedFlagLastUpdatedTime
+ final DateTime maxUsedStatusLastUpdatedTime
)
{
return null;
diff --git
a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
index b83ebf67527..089cd2e1d29 100644
---
a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
@@ -591,7 +591,7 @@ public class DataSourcesResourceTest
Interval theInterval = Intervals.of(interval.replace('_', '/'));
OverlordClient overlordClient =
EasyMock.createStrictMock(OverlordClient.class);
- EasyMock.expect(overlordClient.runKillTask("api-issued", "datasource1",
theInterval, null))
+ EasyMock.expect(overlordClient.runKillTask("api-issued", "datasource1",
theInterval, null, null))
.andReturn(Futures.immediateFuture("kill_task_1"));
EasyMock.replay(overlordClient, server);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]