This is an automated email from the ASF dual-hosted git repository.
zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ba957a9b97 Add ability to limit the number of segments killed in kill
task (#14662)
ba957a9b97 is described below
commit ba957a9b97b6bdbecb166db771721cced1dac365
Author: zachjsh <[email protected]>
AuthorDate: Thu Aug 3 22:17:04 2023 -0400
Add ability to limit the number of segments killed in kill task (#14662)
### Description
Previously, the `maxSegments` configured for auto kill could be ignored if
an interval of data for a given datasource had more than this number of unused
segments, causing the kill task spawned with the task of deleting unused
segments in that given interval of data to delete more than the `maxSegments`
configured. Now each kill task spawned by the auto kill coordinator duty, will
kill at most `limit` segments. This is done by adding a new config property to
the `KillUnusedSegmentTas [...]
---
docs/data-management/delete.md | 9 +-
.../actions/RetrieveUnusedSegmentsAction.java | 20 ++-
.../druid/indexing/common/task/ArchiveTask.java | 2 +-
.../common/task/KillUnusedSegmentsTask.java | 119 ++++++++++++-----
.../druid/indexing/common/task/MoveTask.java | 2 +-
.../druid/indexing/common/task/RestoreTask.java | 2 +-
.../apache/druid/indexing/common/task/Task.java | 2 +-
.../actions/RetrieveSegmentsActionsTest.java | 2 +-
...ClientKillUnusedSegmentsTaskQuerySerdeTest.java | 13 +-
.../common/task/KillUnusedSegmentsTaskTest.java | 144 ++++++++++++++++++++-
.../druid/indexing/overlord/TaskLifecycleTest.java | 101 ++++++++++++++-
.../TestIndexerMetadataStorageCoordinator.java | 17 +++
.../ClientKillUnusedSegmentsTaskQuery.java | 23 +++-
.../IndexerMetadataStorageCoordinator.java | 19 ++-
.../IndexerSQLMetadataStorageCoordinator.java | 12 +-
.../druid/metadata/SqlSegmentsMetadataManager.java | 2 +-
.../druid/metadata/SqlSegmentsMetadataQuery.java | 17 ++-
.../apache/druid/rpc/indexing/OverlordClient.java | 33 ++++-
.../coordinator/duty/KillUnusedSegments.java | 10 +-
.../druid/server/http/DataSourcesResource.java | 2 +-
.../ClientKillUnusedSegmentsTaskQueryTest.java | 18 ++-
.../IndexerSQLMetadataStorageCoordinatorTest.java | 16 +++
.../druid/rpc/indexing/OverlordClientImplTest.java | 9 +-
.../coordinator/duty/KillUnusedSegmentsTest.java | 4 +-
.../druid/server/http/DataSourcesResourceTest.java | 2 +-
25 files changed, 526 insertions(+), 74 deletions(-)
diff --git a/docs/data-management/delete.md b/docs/data-management/delete.md
index 260a66a174..9e59c751bc 100644
--- a/docs/data-management/delete.md
+++ b/docs/data-management/delete.md
@@ -96,15 +96,18 @@ The available grammar is:
"dataSource": <task_datasource>,
"interval" : <all_unused_segments_in_this_interval_will_die!>,
"context": <task context>,
- "batchSize": <optional_batch size>
+ "batchSize": <optional_batch size>,
+ "limit": <the maximum number of segments to delete>
}
```
Some of the parameters used in the task payload are further explained below:
-| Parameter |Default| Explanation
|
-|--------------|-------|--------------------------------------------------------------------------------------------------------|
+| Parameter | Default | Explanation
|
+|-------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `batchSize` |100 | Maximum number of segments that are deleted in one
kill batch. Some operations on the Overlord may get stuck while a `kill` task
is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus,
a `kill` task splits the list of unused segments to be deleted into smaller
batches to yield the Overlord resources intermittently to other task
operations.|
+| `limit` | null - no limit | Maximum number of segments for the kill task
to delete.|
+
**WARNING:** The `kill` task permanently removes all information about the
affected segments from the metadata store and
deep storage. This operation cannot be undone.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
index f114ec4564..150648858c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java
@@ -27,6 +27,8 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
+
import java.util.List;
public class RetrieveUnusedSegmentsAction implements
TaskAction<List<DataSegment>>
@@ -37,14 +39,19 @@ public class RetrieveUnusedSegmentsAction implements
TaskAction<List<DataSegment
@JsonIgnore
private final Interval interval;
+ @JsonIgnore
+ private final Integer limit;
+
@JsonCreator
public RetrieveUnusedSegmentsAction(
@JsonProperty("dataSource") String dataSource,
- @JsonProperty("interval") Interval interval
+ @JsonProperty("interval") Interval interval,
+ @JsonProperty("limit") @Nullable Integer limit
)
{
this.dataSource = dataSource;
this.interval = interval;
+ this.limit = limit;
}
@JsonProperty
@@ -59,6 +66,13 @@ public class RetrieveUnusedSegmentsAction implements
TaskAction<List<DataSegment
return interval;
}
+ @Nullable
+ @JsonProperty
+ public Integer getLimit()
+ {
+ return limit;
+ }
+
@Override
public TypeReference<List<DataSegment>> getReturnTypeReference()
{
@@ -68,7 +82,8 @@ public class RetrieveUnusedSegmentsAction implements
TaskAction<List<DataSegment
@Override
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
- return
toolbox.getIndexerMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(dataSource,
interval);
+ return toolbox.getIndexerMetadataStorageCoordinator()
+ .retrieveUnusedSegmentsForInterval(dataSource, interval, limit);
}
@Override
@@ -83,6 +98,7 @@ public class RetrieveUnusedSegmentsAction implements
TaskAction<List<DataSegment
return getClass().getSimpleName() + "{" +
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
+ ", limit=" + limit +
'}';
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
index 904181a243..42d04316a4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java
@@ -79,7 +79,7 @@ public class ArchiveTask extends AbstractFixedIntervalTask
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
- .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(),
myLock.getInterval()));
+ .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(),
myLock.getInterval(), null));
// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
index 0d54ae96b0..35653aa230 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
@@ -26,8 +26,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskToolbox;
@@ -37,6 +37,7 @@ import
org.apache.druid.indexing.common.actions.SegmentNukeAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
@@ -44,6 +45,8 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -61,6 +64,7 @@ import java.util.stream.Collectors;
*/
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
+ public static final String TYPE = "kill";
private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);
/**
@@ -74,14 +78,16 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100;
private final boolean markAsUnused;
-
/**
* Split processing to try and keep each nuke operation relatively short, in
the case that either
* the database or the storage layer is particularly slow.
*/
private final int batchSize;
+ @Nullable private final Integer limit;
+
- // counter included primarily for testing
+ // counters included primarily for testing
+ private int numSegmentsKilled = 0;
private long numBatchesProcessed = 0;
@JsonCreator
@@ -90,8 +96,9 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("context") Map<String, Object> context,
- @JsonProperty("markAsUnused") Boolean markAsUnused,
- @JsonProperty("batchSize") Integer batchSize
+ @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
+ @JsonProperty("batchSize") Integer batchSize,
+ @JsonProperty("limit") @Nullable Integer limit
)
{
super(
@@ -103,6 +110,19 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
this.markAsUnused = markAsUnused != null && markAsUnused;
this.batchSize = (batchSize != null) ? batchSize :
DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
Preconditions.checkArgument(this.batchSize > 0, "batchSize should be
greater than zero");
+ if (null != limit && limit <= 0) {
+ throw InvalidInput.exception(
+ "limit [%d] is invalid. It must be a positive integer.",
+ limit
+ );
+ }
+ if (limit != null && markAsUnused != null && markAsUnused) {
+ throw InvalidInput.exception(
+ "limit cannot be provided with markAsUnused.",
+ limit
+ );
+ }
+ this.limit = limit;
}
@JsonProperty
@@ -119,10 +139,17 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
return batchSize;
}
+ @Nullable
+ @JsonProperty
+ public Integer getLimit()
+ {
+ return limit;
+ }
+
@Override
public String getType()
{
- return "kill";
+ return TYPE;
}
@Nonnull
@@ -140,6 +167,13 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
return numBatchesProcessed;
}
+ @JsonIgnore
+ @VisibleForTesting
+ long getNumSegmentsKilled()
+ {
+ return numSegmentsKilled;
+ }
+
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
@@ -153,27 +187,29 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
}
// List unused segments
- final List<DataSegment> allUnusedSegments = toolbox
- .getTaskActionClient()
- .submit(new RetrieveUnusedSegmentsAction(getDataSource(),
getInterval()));
-
- final List<List<DataSegment>> unusedSegmentBatches =
Lists.partition(allUnusedSegments, batchSize);
-
- // The individual activities here on the toolbox have possibility to run
for a longer period of time,
- // since they involve calls to metadata storage and archival object
storage. And, the tasks take hold of the
- // task lockbox to run. By splitting the segment list into smaller
batches, we have an opportunity to yield the
- // lock to other activity that might need to happen using the overlord
tasklockbox.
-
- LOG.info("Running kill task[%s] for dataSource[%s] and interval[%s].
Killing total [%,d] unused segments in [%d] batches(batchSize = [%d]).",
- getId(), getDataSource(), getInterval(), allUnusedSegments.size(),
unusedSegmentBatches.size(), batchSize);
+ int nextBatchSize = computeNextBatchSize(numSegmentsKilled);
+ @Nullable Integer numTotalBatches = getNumTotalBatches();
+ List<DataSegment> unusedSegments;
+ LOG.info(
+ "Starting kill with batchSize[%d], up to limit[%d] segments will be
deleted%s",
+ batchSize,
+ limit,
+ numTotalBatches != null ? StringUtils.format(" in([%d] batches]).",
numTotalBatches) : "."
+ );
+ do {
+ if (nextBatchSize <= 0) {
+ break;
+ }
+ unusedSegments = toolbox
+ .getTaskActionClient()
+ .submit(new RetrieveUnusedSegmentsAction(getDataSource(),
getInterval(), nextBatchSize));
- for (final List<DataSegment> unusedSegments : unusedSegmentBatches) {
if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
throw new ISE(
- "Locks[%s] for task[%s] can't cover segments[%s]",
-
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
- getId(),
- unusedSegments
+ "Locks[%s] for task[%s] can't cover segments[%s]",
+
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
+ getId(),
+ unusedSegments
);
}
@@ -186,19 +222,40 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
toolbox.getTaskActionClient().submit(new SegmentNukeAction(new
HashSet<>(unusedSegments)));
toolbox.getDataSegmentKiller().kill(unusedSegments);
numBatchesProcessed++;
+ numSegmentsKilled += unusedSegments.size();
- if (numBatchesProcessed % 10 == 0) {
- LOG.info("Processed [%d/%d] batches for kill task[%s].",
- numBatchesProcessed, unusedSegmentBatches.size(), getId());
- }
- }
+ LOG.info("Processed [%d] batches for kill task[%s].",
numBatchesProcessed, getId());
- LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s].
Deleted total [%,d] unused segments in [%d] batches.",
- getId(), getDataSource(), getInterval(), allUnusedSegments.size(),
unusedSegmentBatches.size());
+ nextBatchSize = computeNextBatchSize(numSegmentsKilled);
+ } while (unusedSegments.size() != 0 && (null == numTotalBatches ||
numBatchesProcessed < numTotalBatches));
+
+ LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s].
Deleted total [%d] unused segments "
+ + "in [%d] batches.",
+ getId(),
+ getDataSource(),
+ getInterval(),
+ numSegmentsKilled,
+ numBatchesProcessed
+ );
return TaskStatus.success(getId());
}
+ @JsonIgnore
+ @VisibleForTesting
+ @Nullable
+ Integer getNumTotalBatches()
+ {
+ return null != limit ? (int) Math.ceil((double) limit / batchSize) : null;
+ }
+
+ @JsonIgnore
+ @VisibleForTesting
+ int computeNextBatchSize(int numSegmentsKilled)
+ {
+ return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) :
batchSize;
+ }
+
private NavigableMap<DateTime, List<TaskLock>>
getTaskLockMap(TaskActionClient client) throws IOException
{
final NavigableMap<DateTime, List<TaskLock>> taskLockMap = new TreeMap<>();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
index 3e8b792eb8..d23b3820db 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java
@@ -87,7 +87,7 @@ public class MoveTask extends AbstractFixedIntervalTask
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
- .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(),
myLock.getInterval()));
+ .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(),
myLock.getInterval(), null));
// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
index 5e91ad3375..1364bcb597 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java
@@ -80,7 +80,7 @@ public class RestoreTask extends AbstractFixedIntervalTask
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
- .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(),
myLock.getInterval()));
+ .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(),
myLock.getInterval(), null));
// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 5c3186ef39..58a2ad435b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -61,7 +61,7 @@ import java.util.Set;
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
- @Type(name = "kill", value = KillUnusedSegmentsTask.class),
+ @Type(name = KillUnusedSegmentsTask.TYPE, value =
KillUnusedSegmentsTask.class),
@Type(name = "move", value = MoveTask.class),
@Type(name = "archive", value = ArchiveTask.class),
@Type(name = "restore", value = RestoreTask.class),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
index 6149208fc3..2d360dfeb5 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java
@@ -104,7 +104,7 @@ public class RetrieveSegmentsActionsTest
@Test
public void testRetrieveUnusedSegmentsAction()
{
- final RetrieveUnusedSegmentsAction action = new
RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL);
+ final RetrieveUnusedSegmentsAction action = new
RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null);
final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task,
actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUnusedSegments, resultSegments);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
index 70cd5fcf19..3ab6bae468 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java
@@ -51,8 +51,9 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
"killTaskId",
"datasource",
Intervals.of("2020-01-01/P1D"),
- true,
- 99
+ false,
+ 99,
+ 5
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask)
objectMapper.readValue(json, Task.class);
@@ -61,6 +62,8 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertEquals(taskQuery.getMarkAsUnused(),
fromJson.isMarkAsUnused());
Assert.assertEquals(taskQuery.getBatchSize(),
Integer.valueOf(fromJson.getBatchSize()));
+ Assert.assertEquals(taskQuery.getLimit(), fromJson.getLimit());
+
}
@Test
@@ -71,6 +74,7 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
"datasource",
Intervals.of("2020-01-01/P1D"),
true,
+ null,
null
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
@@ -80,6 +84,7 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertEquals(taskQuery.getMarkAsUnused(),
fromJson.isMarkAsUnused());
Assert.assertEquals(100, fromJson.getBatchSize());
+ Assert.assertNull(taskQuery.getLimit());
}
@Test
@@ -91,7 +96,8 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Intervals.of("2020-01-01/P1D"),
null,
true,
- 99
+ 99,
+ null
);
final byte[] json = objectMapper.writeValueAsBytes(task);
final ClientKillUnusedSegmentsTaskQuery taskQuery =
(ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue(
@@ -103,5 +109,6 @@ public class ClientKillUnusedSegmentsTaskQuerySerdeTest
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
Assert.assertEquals(Integer.valueOf(task.getBatchSize()),
taskQuery.getBatchSize());
+ Assert.assertNull(task.getLimit());
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
index f57624ed7d..3a89c19a78 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java
@@ -81,6 +81,7 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
Intervals.of("2019-03-01/2019-04-01"),
null,
false,
+ null,
null
);
@@ -97,7 +98,9 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
- Assert.assertEquals(1L, task.getNumBatchesProcessed());
+
+ Assert.assertEquals(2L, task.getNumBatchesProcessed());
+ Assert.assertEquals(1, task.getNumSegmentsKilled());
}
@@ -128,6 +131,7 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
Intervals.of("2019-03-01/2019-04-01"),
null,
true,
+ null,
null
);
@@ -144,7 +148,8 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
- Assert.assertEquals(1L, task.getNumBatchesProcessed());
+ Assert.assertEquals(2L, task.getNumBatchesProcessed());
+ Assert.assertEquals(1, task.getNumSegmentsKilled());
}
@Test
@@ -157,13 +162,14 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
Intervals.of("2019-03-01/2019-04-01"),
null,
true,
+ null,
null
);
Assert.assertTrue(task.getInputSourceResources().isEmpty());
}
@Test
- public void testKillBatchSizeOne() throws Exception
+ public void testKillBatchSizeOneAndLimit4() throws Exception
{
final String version = DateTimes.nowUtc().toString();
final Set<DataSegment> segments = ImmutableSet.of(
@@ -176,14 +182,23 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
Assert.assertEquals(segments, announced);
+ Assert.assertEquals(
+ segments.size(),
+ getSegmentsMetadataManager().markAsUnusedSegmentsInInterval(
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01")
+ )
+ );
+
final KillUnusedSegmentsTask task =
new KillUnusedSegmentsTask(
null,
DATA_SOURCE,
Intervals.of("2018-01-01/2020-01-01"),
null,
- true,
- 1
+ false,
+ 1,
+ 4
);
Assert.assertEquals(TaskState.SUCCESS,
taskRunner.run(task).get().getStatusCode());
@@ -195,6 +210,7 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
Assert.assertEquals(Collections.emptyList(), unusedSegments);
Assert.assertEquals(4L, task.getNumBatchesProcessed());
+ Assert.assertEquals(4, task.getNumSegmentsKilled());
}
@Test
@@ -218,7 +234,8 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
Intervals.of("2018-01-01/2020-01-01"),
null,
true,
- 3
+ 3,
+ null
);
Assert.assertEquals(TaskState.SUCCESS,
taskRunner.run(task).get().getStatusCode());
@@ -229,7 +246,120 @@ public class KillUnusedSegmentsTaskTest extends
IngestionTestBase
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE,
Intervals.of("2019/2020"));
Assert.assertEquals(Collections.emptyList(), unusedSegments);
- Assert.assertEquals(2L, task.getNumBatchesProcessed());
+ Assert.assertEquals(3L, task.getNumBatchesProcessed());
+ Assert.assertEquals(4, task.getNumSegmentsKilled());
+ }
+
+ @Test
+ public void testComputeNextBatchSizeDefault()
+ {
+ final KillUnusedSegmentsTask task =
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01"),
+ null,
+ false,
+ null,
+ null
+ );
+ Assert.assertEquals(100, task.computeNextBatchSize(50));
+ }
+
+ @Test
+ public void testComputeNextBatchSizeWithBatchSizeLargerThanLimit()
+ {
+ final KillUnusedSegmentsTask task =
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01"),
+ null,
+ false,
+ 10,
+ 5
+ );
+ Assert.assertEquals(5, task.computeNextBatchSize(0));
+ }
+
+ @Test
+ public void testComputeNextBatchSizeWithBatchSizeSmallerThanLimit()
+ {
+ final KillUnusedSegmentsTask task =
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01"),
+ null,
+ false,
+ 5,
+ 10
+ );
+ Assert.assertEquals(5, task.computeNextBatchSize(0));
+ }
+
+ @Test
+ public void testComputeNextBatchSizeWithRemainingLessThanLimit()
+ {
+ final KillUnusedSegmentsTask task =
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01"),
+ null,
+ false,
+ 5,
+ 10
+ );
+ Assert.assertEquals(3, task.computeNextBatchSize(7));
+ }
+
+ @Test
+ public void testGetNumTotalBatchesDefault()
+ {
+ final KillUnusedSegmentsTask task =
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01"),
+ null,
+ false,
+ null,
+ null
+ );
+ Assert.assertNull(task.getNumTotalBatches());
+ }
+
+ @Test
+ public void testGetNumTotalBatchesWithBatchSizeLargerThanLimit()
+ {
+ final KillUnusedSegmentsTask task =
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01"),
+ null,
+ false,
+ 10,
+ 5
+ );
+ Assert.assertEquals(1, (int) task.getNumTotalBatches());
+ }
+
+ @Test
+ public void testGetNumTotalBatchesWithBatchSizeSmallerThanLimit()
+ {
+ final KillUnusedSegmentsTask task =
+ new KillUnusedSegmentsTask(
+ null,
+ DATA_SOURCE,
+ Intervals.of("2018-01-01/2020-01-01"),
+ null,
+ false,
+ 5,
+ 10
+ );
+ Assert.assertEquals(2, (int) task.getNumTotalBatches());
}
private static DataSegment newSegment(Interval interval, String version)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 2ea7fcedf1..31ec645b3f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -949,6 +949,7 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
Intervals.of("2011-04-01/P4D"),
null,
false,
+ null,
null
);
@@ -957,7 +958,7 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
Assert.assertEquals("merged statusCode", TaskState.SUCCESS,
status.getStatusCode());
Assert.assertEquals("num segments published", 0,
mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 3, mdc.getNuked().size());
- Assert.assertEquals("delete segment batch call count", 1,
mdc.getDeleteSegmentsCount());
+ Assert.assertEquals("delete segment batch call count", 2,
mdc.getDeleteSegmentsCount());
Assert.assertTrue(
"expected unused segments get killed",
expectedUnusedSegments.containsAll(mdc.getNuked()) &&
mdc.getNuked().containsAll(
@@ -970,6 +971,104 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
}
}
+ @Test
+ public void testKillUnusedSegmentsTaskWithMaxSegmentsToKill() throws
Exception
+ {
+ final File tmpSegmentDir = temporaryFolder.newFolder();
+
+ List<DataSegment> expectedUnusedSegments = Lists.transform(
+ ImmutableList.of(
+ "2011-04-01/2011-04-02",
+ "2011-04-02/2011-04-03",
+ "2011-04-04/2011-04-05"
+ ), new Function<String, DataSegment>()
+ {
+ @Override
+ public DataSegment apply(String input)
+ {
+ final Interval interval = Intervals.of(input);
+ try {
+ return DataSegment.builder()
+ .dataSource("test_kill_task")
+ .interval(interval)
+ .loadSpec(
+ ImmutableMap.of(
+ "type",
+ "local",
+ "path",
+ tmpSegmentDir.getCanonicalPath()
+ + "/druid/localStorage/wikipedia/"
+ + interval.getStart()
+ + "-"
+ + interval.getEnd()
+ + "/"
+ + "2011-04-6T16:52:46.119-05:00"
+ + "/0/index.zip"
+ )
+ )
+ .version("2011-04-6T16:52:46.119-05:00")
+ .dimensions(ImmutableList.of())
+ .metrics(ImmutableList.of())
+ .shardSpec(NoneShardSpec.instance())
+ .binaryVersion(9)
+ .size(0)
+ .build();
+ }
+ catch (IOException e) {
+ throw new ISE(e, "Error creating segments");
+ }
+ }
+ }
+ );
+
+ mdc.setUnusedSegments(expectedUnusedSegments);
+
+ // manually create local segments files
+ List<File> segmentFiles = new ArrayList<>();
+ for (DataSegment segment :
mdc.retrieveUnusedSegmentsForInterval("test_kill_task",
Intervals.of("2011-04-01/P4D"))) {
+ File file = new File((String) segment.getLoadSpec().get("path"));
+ FileUtils.mkdirp(file.getParentFile());
+ Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY);
+ segmentFiles.add(file);
+ }
+
+ final int maxSegmentsToKill = 2;
+ final Task killUnusedSegmentsTask =
+ new KillUnusedSegmentsTask(
+ null,
+ "test_kill_task",
+ Intervals.of("2011-04-01/P4D"),
+ null,
+ false,
+ null,
+ maxSegmentsToKill
+ );
+
+ final TaskStatus status = runTask(killUnusedSegmentsTask);
+ Assert.assertEquals(taskLocation, status.getLocation());
+ Assert.assertEquals("merged statusCode", TaskState.SUCCESS,
status.getStatusCode());
+ Assert.assertEquals("num segments published", 0,
mdc.getPublished().size());
+ Assert.assertEquals("num segments nuked", maxSegmentsToKill,
mdc.getNuked().size());
+ Assert.assertTrue(
+ "expected unused segments get killed",
+ expectedUnusedSegments.containsAll(mdc.getNuked())
+ );
+
+ int expectedNumOfSegmentsRemaining = segmentFiles.size() -
maxSegmentsToKill;
+ int actualNumOfSegmentsRemaining = 0;
+ for (File file : segmentFiles) {
+ if (file.exists()) {
+ actualNumOfSegmentsRemaining++;
+ }
+ }
+
+ Assert.assertEquals(
+ "Expected of segments deleted did not match expectations",
+ expectedNumOfSegmentsRemaining,
+ actualNumOfSegmentsRemaining
+ );
+ }
+
@Test
public void testRealtimeishTask() throws Exception
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index c8bf8fd28a..66af0a9730 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -42,6 +42,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Stream;
public class TestIndexerMetadataStorageCoordinator implements
IndexerMetadataStorageCoordinator
{
@@ -110,6 +111,22 @@ public class TestIndexerMetadataStorageCoordinator
implements IndexerMetadataSto
}
}
+ @Override
+ public List<DataSegment> retrieveUnusedSegmentsForInterval(String
dataSource, Interval interval, @Nullable Integer limit)
+ {
+ synchronized (unusedSegments) {
+ Stream<DataSegment> resultStream = unusedSegments.stream();
+
+ resultStream = resultStream.filter(ds -> !nuked.contains(ds));
+
+ if (limit != null) {
+ resultStream = resultStream.limit(limit);
+ }
+
+ return ImmutableList.copyOf(resultStream.iterator());
+ }
+ }
+
@Override
public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval
interval)
{
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
index 779c8acc7f..3676d68409 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
+
import java.util.Objects;
/**
@@ -40,14 +42,16 @@ public class ClientKillUnusedSegmentsTaskQuery implements
ClientTaskQuery
private final Interval interval;
private final Boolean markAsUnused;
private final Integer batchSize;
+ @Nullable private final Integer limit;
@JsonCreator
public ClientKillUnusedSegmentsTaskQuery(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
- @JsonProperty("markAsUnused") Boolean markAsUnused,
- @JsonProperty("batchSize") Integer batchSize
+ @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
+ @JsonProperty("batchSize") Integer batchSize,
+ @JsonProperty("limit") Integer limit
)
{
this.id = Preconditions.checkNotNull(id, "id");
@@ -55,6 +59,8 @@ public class ClientKillUnusedSegmentsTaskQuery implements
ClientTaskQuery
this.interval = interval;
this.markAsUnused = markAsUnused;
this.batchSize = batchSize;
+ Preconditions.checkArgument(limit == null || limit > 0, "limit must be >
0");
+ this.limit = limit;
}
@JsonProperty
@@ -96,6 +102,14 @@ public class ClientKillUnusedSegmentsTaskQuery implements
ClientTaskQuery
return batchSize;
}
+ @JsonProperty
+ @Nullable
+ public Integer getLimit()
+ {
+ return limit;
+ }
+
+
@Override
public boolean equals(Object o)
{
@@ -110,12 +124,13 @@ public class ClientKillUnusedSegmentsTaskQuery implements
ClientTaskQuery
&& Objects.equals(dataSource, that.dataSource)
&& Objects.equals(interval, that.interval)
&& Objects.equals(markAsUnused, that.markAsUnused)
- && Objects.equals(batchSize, that.batchSize);
+ && Objects.equals(batchSize, that.batchSize)
+ && Objects.equals(limit, that.limit);
}
@Override
public int hashCode()
{
- return Objects.hash(id, dataSource, interval, markAsUnused, batchSize);
+ return Objects.hash(id, dataSource, interval, markAsUnused, batchSize,
limit);
}
}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index b3c70f0cdb..3d8f4b8586 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -123,17 +123,30 @@ public interface IndexerMetadataStorageCoordinator
Segments visibility
);
+ /**
+ * see {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer)}
+ */
+ default List<DataSegment> retrieveUnusedSegmentsForInterval(String
dataSource, Interval interval)
+ {
+ return retrieveUnusedSegmentsForInterval(dataSource, interval, null);
+ }
+
/**
* Retrieve all published segments which include ONLY data within the given
interval and are marked as unused from the
* metadata store.
*
- * @param dataSource The data source the segments belong to
- * @param interval Filter the data segments to ones that include data in
this interval exclusively.
+ * @param dataSource The data source the segments belong to
+ * @param interval Filter the data segments to ones that include data in
this interval exclusively.
+ * @param limit The maximum number of unused segments to retreive. If null,
no limit is applied.
*
* @return DataSegments which include ONLY data within the requested
interval and are marked as unused. Segments NOT
* returned here may include data in the interval
*/
- List<DataSegment> retrieveUnusedSegmentsForInterval(String dataSource,
Interval interval);
+ List<DataSegment> retrieveUnusedSegmentsForInterval(
+ String dataSource,
+ Interval interval,
+ @Nullable Integer limit
+ );
/**
* Mark as unused segments which include ONLY data within the given interval.
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 107dc4b9f9..6c4d523133 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -190,12 +190,22 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
@Override
public List<DataSegment> retrieveUnusedSegmentsForInterval(final String
dataSource, final Interval interval)
+ {
+ return retrieveUnusedSegmentsForInterval(dataSource, interval, null);
+ }
+
+ @Override
+ public List<DataSegment> retrieveUnusedSegmentsForInterval(
+ String dataSource,
+ Interval interval,
+ @Nullable Integer limit
+ )
{
final List<DataSegment> matchingSegments = connector.inReadOnlyTransaction(
(handle, status) -> {
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector,
dbTables, jsonMapper)
- .retrieveUnusedSegments(dataSource,
Collections.singletonList(interval))) {
+ .retrieveUnusedSegments(dataSource,
Collections.singletonList(interval), limit)) {
return ImmutableList.copyOf(iterator);
}
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
index d98c5d4d08..46db26a56d 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
@@ -574,7 +574,7 @@ public class SqlSegmentsMetadataManager implements
SegmentsMetadataManager
}
try (final CloseableIterator<DataSegment> iterator =
- queryTool.retrieveUnusedSegments(dataSourceName,
intervals)) {
+ queryTool.retrieveUnusedSegments(dataSourceName, intervals,
null)) {
while (iterator.hasNext()) {
final DataSegment dataSegment = iterator.next();
timeline.addSegments(Iterators.singletonIterator(dataSegment));
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index 13460e2695..45896a865e 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -37,6 +37,8 @@ import org.skife.jdbi.v2.PreparedBatch;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
+import javax.annotation.Nullable;
+
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -104,7 +106,7 @@ public class SqlSegmentsMetadataQuery
final Collection<Interval> intervals
)
{
- return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS,
true);
+ return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS,
true, null);
}
/**
@@ -118,10 +120,11 @@ public class SqlSegmentsMetadataQuery
*/
public CloseableIterator<DataSegment> retrieveUnusedSegments(
final String dataSource,
- final Collection<Interval> intervals
+ final Collection<Interval> intervals,
+ @Nullable final Integer limit
)
{
- return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS,
false);
+ return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS,
false, limit);
}
/**
@@ -201,7 +204,7 @@ public class SqlSegmentsMetadataQuery
// Retrieve, then drop, since we can't write a WHERE clause directly.
final List<SegmentId> segments = ImmutableList.copyOf(
Iterators.transform(
- retrieveSegments(dataSource,
Collections.singletonList(interval), IntervalMode.CONTAINS, true),
+ retrieveSegments(dataSource,
Collections.singletonList(interval), IntervalMode.CONTAINS, true, null),
DataSegment::getId
)
);
@@ -213,7 +216,8 @@ public class SqlSegmentsMetadataQuery
final String dataSource,
final Collection<Interval> intervals,
final IntervalMode matchMode,
- final boolean used
+ final boolean used,
+ @Nullable final Integer limit
)
{
// Check if the intervals all support comparing as strings. If so, bake
them into the SQL.
@@ -259,6 +263,9 @@ public class SqlSegmentsMetadataQuery
.setFetchSize(connector.getStreamingFetchSize())
.bind("used", used)
.bind("dataSource", dataSource);
+ if (null != limit) {
+ sql.setMaxRows(limit);
+ }
if (compareAsString) {
final Iterator<Interval> iterator = intervals.iterator();
diff --git
a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
index 51b4323a11..b4391dc0a8 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java
@@ -84,9 +84,40 @@ public interface OverlordClient
* @return future with task ID
*/
default ListenableFuture<String> runKillTask(String idPrefix, String
dataSource, Interval interval)
+ {
+ return runKillTask(idPrefix, dataSource, interval, null);
+ }
+
+ /**
+ * Run a "kill" task for a particular datasource and interval. Shortcut to
{@link #runTask(String, Object)}.
+ *
+ * The kill task deletes all unused segment records from deep storage and
the metadata store. The task runs
+ * asynchronously after the API call returns. The resolved future is the ID
of the task, which can be used to
+ * monitor its progress through the {@link #taskStatus(String)} API.
+ *
+ * @param idPrefix Descriptive prefix to include at the start of task IDs
+ * @param dataSource Datasource to kill
+ * @param interval Interval to kill
+ * @param maxSegmentsToKill The maximum number of segments to kill
+ *
+ * @return future with task ID
+ */
+ default ListenableFuture<String> runKillTask(
+ String idPrefix,
+ String dataSource,
+ Interval interval,
+ @Nullable Integer maxSegmentsToKill
+ )
{
final String taskId = IdUtils.newTaskId(idPrefix,
ClientKillUnusedSegmentsTaskQuery.TYPE, dataSource, interval);
- final ClientTaskQuery taskQuery = new
ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval, false, null);
+ final ClientTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(
+ taskId,
+ dataSource,
+ interval,
+ false,
+ null,
+ maxSegmentsToKill
+ );
return FutureUtils.transform(runTask(taskId, taskQuery), ignored ->
taskId);
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index f4b6240b43..205947b003 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -33,6 +33,8 @@ import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
+
import java.util.Collection;
import java.util.List;
@@ -130,7 +132,12 @@ public class KillUnusedSegments implements CoordinatorDuty
}
try {
-
FutureUtils.getUnchecked(overlordClient.runKillTask("coordinator-issued",
dataSource, intervalToKill), true);
+ FutureUtils.getUnchecked(overlordClient.runKillTask(
+ "coordinator-issued",
+ dataSource,
+ intervalToKill,
+ maxSegmentsToKill
+ ), true);
++submittedTasks;
}
catch (Exception ex) {
@@ -148,6 +155,7 @@ public class KillUnusedSegments implements CoordinatorDuty
/**
* Calculates the interval for which segments are to be killed in a
datasource.
*/
+ @Nullable
private Interval findIntervalForKill(String dataSource)
{
final DateTime maxEndTime = ignoreRetainDuration
diff --git
a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index 4a00f61300..1e146736da 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -342,7 +342,7 @@ public class DataSourcesResource
}
final Interval theInterval = Intervals.of(interval.replace('_', '/'));
try {
- FutureUtils.getUnchecked(overlordClient.runKillTask("api-issued",
dataSourceName, theInterval), true);
+ FutureUtils.getUnchecked(overlordClient.runKillTask("api-issued",
dataSourceName, theInterval, null), true);
return Response.ok().build();
}
catch (Exception e) {
diff --git
a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
index af9b2c8ec1..60edff9307 100644
---
a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
+++
b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java
@@ -35,13 +35,21 @@ public class ClientKillUnusedSegmentsTaskQueryTest
private static final Interval INTERVAL = new Interval(START, START.plus(1));
private static final Boolean MARK_UNUSED = true;
private static final Integer BATCH_SIZE = 999;
+ private static final Integer LIMIT = 1000;
ClientKillUnusedSegmentsTaskQuery clientKillUnusedSegmentsQuery;
@Before
public void setUp()
{
- clientKillUnusedSegmentsQuery = new
ClientKillUnusedSegmentsTaskQuery("killTaskId", DATA_SOURCE, INTERVAL, true,
BATCH_SIZE);
+ clientKillUnusedSegmentsQuery = new ClientKillUnusedSegmentsTaskQuery(
+ "killTaskId",
+ DATA_SOURCE,
+ INTERVAL,
+ true,
+ BATCH_SIZE,
+ LIMIT
+ );
}
@After
@@ -80,12 +88,18 @@ public class ClientKillUnusedSegmentsTaskQueryTest
Assert.assertEquals(BATCH_SIZE,
clientKillUnusedSegmentsQuery.getBatchSize());
}
+ @Test
+ public void testGetLimit()
+ {
+ Assert.assertEquals(LIMIT, clientKillUnusedSegmentsQuery.getLimit());
+ }
+
@Test
public void testEquals()
{
EqualsVerifier.forClass(ClientKillUnusedSegmentsTaskQuery.class)
.usingGetClass()
- .withNonnullFields("id", "dataSource", "interval",
"batchSize")
+ .withNonnullFields("id", "dataSource", "interval",
"batchSize", "limit")
.verify();
}
}
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 74e06bfb66..444e159411 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -932,6 +932,22 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
}
+ @Test
+ public void testSimpleUnusedListWithLimit() throws IOException
+ {
+ coordinator.announceHistoricalSegments(SEGMENTS);
+ markAllSegmentsUnused();
+ int limit = SEGMENTS.size() - 1;
+ Set<DataSegment> retreivedUnusedSegments = ImmutableSet.copyOf(
+ coordinator.retrieveUnusedSegmentsForInterval(
+ defaultSegment.getDataSource(),
+ defaultSegment.getInterval(),
+ limit
+ )
+ );
+ Assert.assertEquals(limit, retreivedUnusedSegments.size());
+ Assert.assertTrue(SEGMENTS.containsAll(retreivedUnusedSegments));
+ }
@Test
public void testUsedOverlapLow() throws IOException
diff --git
a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
index c1e48c496c..5b9a88d584 100644
---
a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
+++
b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java
@@ -422,7 +422,14 @@ public class OverlordClientImplTest
public void test_taskPayload() throws ExecutionException,
InterruptedException, JsonProcessingException
{
final String taskID = "taskId_1";
- final ClientTaskQuery clientTaskQuery = new
ClientKillUnusedSegmentsTaskQuery(taskID, "test", null, null, null);
+ final ClientTaskQuery clientTaskQuery = new
ClientKillUnusedSegmentsTaskQuery(
+ taskID,
+ "test",
+ null,
+ null,
+ null,
+ null
+ );
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/task/" + taskID),
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index 8f43b36bbe..039174eac7 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -216,11 +216,13 @@ public class KillUnusedSegmentsTest
private void runAndVerifyKillInterval(Interval expectedKillInterval)
{
+ int limit = config.getCoordinatorKillMaxSegments();
target.run(params);
Mockito.verify(overlordClient, Mockito.times(1)).runKillTask(
ArgumentMatchers.anyString(),
ArgumentMatchers.eq("DS1"),
- ArgumentMatchers.eq(expectedKillInterval)
+ ArgumentMatchers.eq(expectedKillInterval),
+ ArgumentMatchers.eq(limit)
);
}
diff --git
a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
index 5dec3154d6..5511514585 100644
---
a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
@@ -593,7 +593,7 @@ public class DataSourcesResourceTest
Interval theInterval = Intervals.of(interval.replace('_', '/'));
OverlordClient overlordClient =
EasyMock.createStrictMock(OverlordClient.class);
- EasyMock.expect(overlordClient.runKillTask("api-issued", "datasource1",
theInterval))
+ EasyMock.expect(overlordClient.runKillTask("api-issued", "datasource1",
theInterval, null))
.andReturn(Futures.immediateFuture(null));
EasyMock.replay(overlordClient, server);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]