This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3be94a09d81 Add embedded kill tasks that run on the Overlord (#18028)
3be94a09d81 is described below
commit 3be94a09d81d98bf5029c18832636e533cdbbb90
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed Jun 4 00:16:56 2025 +0530
Add embedded kill tasks that run on the Overlord (#18028)
Description
-----------
This patch adds an embedded mode of running kill tasks on the Overlord
itself.
These embedded tasks
- kill unused segments as soon as they become eligible for kill.
- run on the Overlord and do not take up task slots.
- finish faster as a separate task process is not involved.
- kill a small number of segments per task, to ensure that locks on an
interval are not held for too long.
- skip locked intervals to avoid head-of-line blocking in kill tasks.
- do not require any configuration
- can keep up with a large number of unused segments in the cluster.
Changes
---------
- Add `UnusedSegmentKiller`: `OverlordDuty` that launches embedded kill
tasks
- Add `UnusedSegmentKillerConfig` with the following fields:
- `enabled`: Turns on segment killer on the Overlord
- `bufferPeriod`: Period for which segments are retained even after being
marked as unused.
- Add `EmbeddedKillTask`: extends `KillUnusedSegmentsTask` to modify some
behaviour
- Add `KillTaskToolbox`: simplified version of `TaskToolbox` to run
embedded kill tasks
- Make minor changes to `KillUnusedSegmentsTask` to modify some behaviour
for embedded tasks
- Add methods to retrieve unused segments and intervals to
`IndexerMetadataStorageCoordinator`
and `SqlSegmentsMetadataQuery`
- Add short-circuit to `TaskLockbox.remove` and make it idempotent
---
.../druid/storage/s3/S3DataSegmentKiller.java | 2 +-
indexing-service/pom.xml | 5 +
.../indexing/common/actions/SegmentNukeAction.java | 40 +-
.../common/task/KillUnusedSegmentsTask.java | 73 +++-
.../druid/indexing/common/task/TaskMetrics.java | 34 +-
.../apache/druid/indexing/common/task/Tasks.java | 6 +
.../druid/indexing/overlord/TaskLockbox.java | 15 +-
.../indexing/overlord/duty/KillTaskToolbox.java | 97 +++++
.../overlord/duty/OverlordDutyExecutor.java | 11 +-
.../overlord/duty/UnusedSegmentsKiller.java | 477 +++++++++++++++++++++
.../indexing/common/actions/TaskActionTestKit.java | 37 +-
.../indexing/common/task/IngestionTestBase.java | 4 +-
.../overlord/duty/OverlordDutyExecutorTest.java | 4 +-
.../overlord/duty/UnusedSegmentsKillerTest.java | 462 ++++++++++++++++++++
.../TestIndexerMetadataStorageCoordinator.java | 32 +-
.../druid/indexer/report/KillTaskReport.java | 2 +-
.../druid/segment/loading/SegmentKillResult.java | 40 +-
.../druid/java/util/metrics/MetricsVerifier.java | 36 +-
.../IndexerMetadataStorageCoordinator.java | 36 +-
.../IndexerSQLMetadataStorageCoordinator.java | 41 +-
.../metadata/SegmentsMetadataManagerConfig.java | 23 +-
.../druid/metadata/SqlSegmentsMetadataQuery.java | 119 ++++-
...rConfig.java => UnusedSegmentKillerConfig.java} | 39 +-
.../segment/CachedSegmentMetadataTransaction.java | 7 +
.../segment/SegmentMetadataReadTransaction.java | 6 +
.../segment/SqlSegmentMetadataTransaction.java | 6 +
.../coordinator/duty/KillUnusedSegments.java | 8 +-
...rSQLMetadataStorageCoordinatorReadOnlyTest.java | 2 +-
.../IndexerSQLMetadataStorageCoordinatorTest.java | 101 ++++-
.../SqlSegmentsMetadataManagerProviderTest.java | 2 +-
.../SqlSegmentsMetadataManagerSchemaPollTest.java | 4 +-
.../metadata/SqlSegmentsMetadataManagerTest.java | 3 +-
.../SqlSegmentsMetadataManagerTestBase.java | 2 +-
.../segment/SqlSegmentsMetadataManagerV2Test.java | 2 +-
.../cache/HeapMemorySegmentMetadataCacheTest.java | 2 +-
.../CoordinatorSegmentMetadataCacheTest.java | 2 +-
.../simulate/TestDruidLeaderSelector.java | 11 +-
.../java/org/apache/druid/cli/CliOverlord.java | 9 +-
.../main/java/org/apache/druid/cli/CliPeon.java | 9 +-
39 files changed, 1649 insertions(+), 162 deletions(-)
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java
index fbe703a888e..79435869e7b 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java
@@ -149,7 +149,7 @@ public class S3DataSegmentKiller implements
DataSegmentKiller
try {
deleteObjectsRequest.setKeys(chunkOfKeys);
log.info(
- "Removing from bucket: [%s] the following index files: [%s] from
s3!",
+ "Deleting the following segment files from S3 bucket[%s]: [%s]",
s3Bucket,
keysToDeleteStrings
);
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 5c722530acf..da30e431e8f 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -267,6 +267,11 @@
<artifactId>maven-resolver-api</artifactId>
<version>1.3.1</version>
</dependency>
+ <dependency>
+ <groupId>org.jdbi</groupId>
+ <artifactId>jdbi</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java
index 84445d3968a..6c73f664e3f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java
@@ -27,16 +27,23 @@ import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
import java.util.Set;
import java.util.stream.Collectors;
+/**
+ * Permanently deletes unused segments from the metadata store.
+ */
public class SegmentNukeAction implements TaskAction<Void>
{
+ private static final Logger log = new Logger(SegmentNukeAction.class);
+
private final Set<DataSegment> segments;
@JsonCreator
@@ -65,22 +72,25 @@ public class SegmentNukeAction implements TaskAction<Void>
TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(),
segments);
try {
- toolbox.getTaskLockbox().doInCriticalSection(
+ final Set<Interval> intervals =
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet());
+ int numDeletedSegments = toolbox.getTaskLockbox().doInCriticalSection(
task,
-
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
- CriticalAction.builder()
- .onValidLocks(
- () -> {
-
toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments);
- return null;
- }
- )
- .onInvalidLocks(
- () -> {
- throw new ISE("Some locks for task[%s] are
already revoked", task.getId());
- }
- )
- .build()
+ intervals,
+ CriticalAction.<Integer>builder().onValidLocks(
+ () ->
toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments)
+ ).onInvalidLocks(
+ () -> {
+ throw new ISE("Some locks for task[%s] are already revoked",
task.getId());
+ }
+ ).build()
+ );
+
+ log.info(
+ "Deleted [%d] segments from metadata store out of requested[%d],"
+ + " across [%d] intervals[%s], for task[%s] of datasource[%s].",
+ numDeletedSegments, segments.size(),
+ intervals.size(), intervals,
+ task.getId(), task.getDataSource()
);
}
catch (Exception e) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
index 06082a988d9..fe58c264ca8 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
@@ -212,7 +212,7 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
int nextBatchSize = computeNextBatchSize(numSegmentsKilled);
@Nullable Integer numTotalBatches = getNumTotalBatches();
List<DataSegment> unusedSegments;
- LOG.info(
+ logInfo(
"Starting kill for datasource[%s] in interval[%s] and versions[%s]
with batchSize[%d], up to limit[%d]"
+ " segments before maxUsedStatusLastUpdatedTime[%s] will be
deleted%s",
getDataSource(), getInterval(), getVersions(), batchSize, limit,
maxUsedStatusLastUpdatedTime,
@@ -236,9 +236,7 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
break;
}
- unusedSegments = toolbox.getTaskActionClient().submit(
- new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(),
getVersions(), nextBatchSize, maxUsedStatusLastUpdatedTime)
- );
+ unusedSegments = fetchNextBatchOfUnusedSegments(toolbox, nextBatchSize);
// Fetch locks each time as a revokal could have occurred in between
batches
final NavigableMap<DateTime, List<TaskLock>> taskLockMap
@@ -283,6 +281,7 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
// Nuke Segments
taskActionClient.submit(new SegmentNukeAction(new
HashSet<>(unusedSegments)));
+ emitMetric(toolbox.getEmitter(),
TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, unusedSegments.size());
// Determine segments to be killed
final List<DataSegment> segmentsToBeKilled
@@ -290,22 +289,27 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
final Set<DataSegment> segmentsNotKilled = new HashSet<>(unusedSegments);
segmentsToBeKilled.forEach(segmentsNotKilled::remove);
- LOG.infoSegments(
- segmentsNotKilled,
- "Skipping segment kill from deep storage as their load specs are
referenced by other segments."
- );
+
+ if (!segmentsNotKilled.isEmpty()) {
+ LOG.warn(
+ "Skipping kill of [%d] segments from deep storage as their load
specs are used by other segments.",
+ segmentsNotKilled.size()
+ );
+ }
toolbox.getDataSegmentKiller().kill(segmentsToBeKilled);
+ emitMetric(toolbox.getEmitter(),
TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, segmentsToBeKilled.size());
+
numBatchesProcessed++;
numSegmentsKilled += segmentsToBeKilled.size();
- LOG.info("Processed [%d] batches for kill task[%s].",
numBatchesProcessed, getId());
+ logInfo("Processed [%d] batches for kill task[%s].",
numBatchesProcessed, getId());
nextBatchSize = computeNextBatchSize(numSegmentsKilled);
} while (!unusedSegments.isEmpty() && (null == numTotalBatches ||
numBatchesProcessed < numTotalBatches));
final String taskId = getId();
- LOG.info(
+ logInfo(
"Finished kill task[%s] for dataSource[%s] and interval[%s]."
+ " Deleted total [%d] unused segments in [%d] batches.",
taskId, getDataSource(), getInterval(), numSegmentsKilled,
numBatchesProcessed
@@ -322,9 +326,8 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
}
@JsonIgnore
- @VisibleForTesting
@Nullable
- Integer getNumTotalBatches()
+ protected Integer getNumTotalBatches()
{
return null != limit ? (int) Math.ceil((double) limit / batchSize) : null;
}
@@ -336,6 +339,31 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) :
batchSize;
}
+ /**
+ * Fetches the next batch of unused segments that are eligible for kill.
+ */
+ protected List<DataSegment> fetchNextBatchOfUnusedSegments(TaskToolbox
toolbox, int nextBatchSize) throws IOException
+ {
+ return toolbox.getTaskActionClient().submit(
+ new RetrieveUnusedSegmentsAction(
+ getDataSource(),
+ getInterval(),
+ getVersions(),
+ nextBatchSize,
+ maxUsedStatusLastUpdatedTime
+ )
+ );
+ }
+
+ /**
+ * Logs the given info message. Exposed here to allow embedded kill tasks to
+ * suppress info logs.
+ */
+ protected void logInfo(String message, Object... args)
+ {
+ LOG.info(message, args);
+ }
+
private NavigableMap<DateTime, List<TaskLock>>
getNonRevokedTaskLockMap(TaskActionClient client) throws IOException
{
final NavigableMap<DateTime, List<TaskLock>> taskLockMap = new TreeMap<>();
@@ -385,6 +413,10 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
response.getUpgradedToSegmentIds().forEach((parent, children) -> {
if (!CollectionUtils.isNullOrEmpty(children)) {
// Do not kill segment if its parent or any of its siblings still
exist in metadata store
+ LOG.info(
+ "Skipping kill of segments[%s] as its load spec is also used
by segment IDs[%s].",
+ parentIdToUnusedSegments.get(parent), children
+ );
parentIdToUnusedSegments.remove(parent);
}
});
@@ -402,10 +434,25 @@ public class KillUnusedSegmentsTask extends
AbstractFixedIntervalTask
return parentIdToUnusedSegments.values()
.stream()
.flatMap(Set::stream)
- .filter(segment ->
!usedSegmentLoadSpecs.contains(segment.getLoadSpec()))
+ .filter(segment ->
!isSegmentLoadSpecPresentIn(segment, usedSegmentLoadSpecs))
.collect(Collectors.toList());
}
+ /**
+ * @return true if the load spec of the segment is present in the given set
of
+ * used load specs.
+ */
+ private boolean isSegmentLoadSpecPresentIn(
+ DataSegment segment,
+ Set<Map<String, Object>> usedSegmentLoadSpecs
+ )
+ {
+ boolean isPresent = usedSegmentLoadSpecs.contains(segment.getLoadSpec());
+ if (isPresent) {
+ LOG.info("Skipping kill of segment[%s] as its load spec is also used by
other segments.", segment);
+ }
+ return isPresent;
+ }
@Override
public LookupLoadingSpec getLookupLoadingSpec()
diff --git
a/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskMetrics.java
similarity index 52%
copy from
server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
copy to
indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskMetrics.java
index cb02f7c8f38..86456d12fdc 100644
---
a/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskMetrics.java
@@ -17,35 +17,21 @@
* under the License.
*/
-package org.apache.druid.metadata.segment;
-
-import org.skife.jdbi.v2.Handle;
-
-import java.io.Closeable;
+package org.apache.druid.indexing.common.task;
/**
- * Represents a single transaction involving read of segment metadata into
- * the metadata store. A transaction is associated with a single instance of a
- * {@link Handle} and is meant to be short-lived.
+ * Task-related metrics emitted by the Druid cluster.
*/
-public interface SegmentMetadataReadTransaction
- extends DatasourceSegmentMetadataReader, Closeable
+public class TaskMetrics
{
- /**
- * @return The JDBI handle used in this transaction
- */
- Handle getHandle();
-
- /**
- * Completes the transaction by either committing it or rolling it back.
- */
- @Override
- void close();
-
- @FunctionalInterface
- interface Callback<T>
+ private TaskMetrics()
{
- T inTransaction(SegmentMetadataReadTransaction transaction) throws
Exception;
+ // no instantiation
}
+ public static final String RUN_DURATION = "task/run/time";
+
+ public static final String SEGMENTS_DELETED_FROM_METADATA_STORE =
"segment/killed/metadataStore/count";
+ public static final String SEGMENTS_DELETED_FROM_DEEPSTORE =
"segment/killed/deepStorage/count";
+ public static final String FILES_DELETED_FROM_DEEPSTORE =
"segment/killed/deepStorageFile/count";
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
index 341ac0c9326..8e62113f0c7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
@@ -32,6 +32,12 @@ public class Tasks
public static final int DEFAULT_BATCH_INDEX_TASK_PRIORITY = 50;
public static final int DEFAULT_MERGE_TASK_PRIORITY = 25;
+ /**
+ * Priority of embedded kill tasks. Kept lower than batch and realtime tasks
+ * to allow them to preempt embbedded kill tasks.
+ */
+ public static final int DEFAULT_EMBEDDED_KILL_TASK_PRIORITY = 25;
+
static {
Verify.verify(DEFAULT_MERGE_TASK_PRIORITY ==
DataSourceCompactionConfig.DEFAULT_COMPACTION_TASK_PRIORITY);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 9815bb14a71..99168143f84 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -1232,16 +1232,17 @@ public class TaskLockbox
{
giant.lock();
try {
- try {
- log.info("Removing task[%s] from activeTasks", task.getId());
- cleanupUpgradeAndPendingSegments(task);
- unlockAll(task);
- }
- finally {
- activeTasks.remove(task.getId());
+ if (!activeTasks.contains(task.getId())) {
+ return;
}
+ log.info("Removing task[%s] from activeTasks", task.getId());
+ cleanupUpgradeAndPendingSegments(task);
+ unlockAll(task);
}
finally {
+ if (task != null) {
+ activeTasks.remove(task.getId());
+ }
giant.unlock();
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/KillTaskToolbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/KillTaskToolbox.java
new file mode 100644
index 00000000000..dd2afe6017d
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/KillTaskToolbox.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.loading.DataSegmentKiller;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
+
+import java.io.OutputStream;
+
+/**
+ * Wrapper over {@link TaskToolbox} used for embedded kill tasks launched by
+ * {@link UnusedSegmentsKiller}.
+ */
+public class KillTaskToolbox
+{
+ /**
+ * Creates a {@link TaskToolbox} with just enough dependencies to make the
+ * embedded kill tasks work in {@link UnusedSegmentsKiller}.
+ */
+ static TaskToolbox create(
+ TaskActionClient taskActionClient,
+ DataSegmentKiller dataSegmentKiller,
+ ServiceEmitter emitter
+ )
+ {
+ final ObjectMapper mapper = DefaultObjectMapper.INSTANCE;
+ final IndexIO indexIO = new IndexIO(mapper, ColumnConfig.DEFAULT);
+
+ return new TaskToolbox.Builder()
+ .taskActionClient(taskActionClient)
+ .dataSegmentKiller(dataSegmentKiller)
+ .taskReportFileWriter(NoopReportWriter.INSTANCE)
+ .indexIO(indexIO)
+ .indexMergerV9(new IndexMergerV9(mapper, indexIO,
TmpFileSegmentWriteOutMediumFactory.instance(), false))
+ .emitter(emitter)
+ .build();
+ }
+
+ /**
+ * Noop report writer.
+ */
+ private static class NoopReportWriter extends SingleFileTaskReportFileWriter
+ {
+ private static final NoopReportWriter INSTANCE = new NoopReportWriter();
+
+ private NoopReportWriter()
+ {
+ super(null);
+ }
+
+ @Override
+ public void setObjectMapper(ObjectMapper objectMapper)
+ {
+ // Do nothing
+ }
+
+ @Override
+ public void write(String taskId, TaskReport.ReportMap reports)
+ {
+ // Do nothing, metrics are emitted by the KillUnusedSegmentsTask itself
+ }
+
+ @Override
+ public OutputStream openReportOutputStream(String taskId)
+ {
+ throw DruidException.defensive("Cannot write reports using this
reporter");
+ }
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java
index 9ba4e90f0f6..689d0876894 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java
@@ -97,7 +97,12 @@ public class OverlordDutyExecutor
initExecutor();
final DutySchedule schedule = duty.getSchedule();
- final String dutyName = duty.getClass().getName();
+ final String dutyName = duty.getClass().getSimpleName();
+
+ if (schedule == null || schedule.getPeriodMillis() <= 0) {
+ log.info("Not scheduling overlord duty[%s] as it has no period
specified.", dutyName);
+ return;
+ }
ScheduledExecutors.scheduleWithFixedDelay(
exec,
@@ -108,13 +113,13 @@ public class OverlordDutyExecutor
duty.run();
}
catch (Exception e) {
- log.error(e, "Error while running duty [%s]", dutyName);
+ log.error(e, "Error while running duty[%s]", dutyName);
}
}
);
log.info(
- "Scheduled overlord duty [%s] with initial delay [%d], period [%d].",
+ "Scheduled overlord duty[%s] with initial delay[%d], period[%d].",
dutyName, schedule.getInitialDelayMillis(), schedule.getPeriodMillis()
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java
new file mode 100644
index 00000000000..bce95ffa258
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import com.google.common.collect.Ordering;
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
+import org.apache.druid.indexing.common.task.TaskMetrics;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.UnusedSegmentKillerConfig;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.loading.DataSegmentKiller;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * {@link OverlordDuty} to delete unused segments from metadata store and the
+ * deep storage. Launches {@link EmbeddedKillTask}s to clean unused segments
+ * of a single datasource-interval.
+ *
+ * @see SegmentsMetadataManagerConfig to enable the cleanup
+ * @see org.apache.druid.server.coordinator.duty.KillUnusedSegments for legacy
+ * mode of killing unused segments via Coordinator duties
+ */
+public class UnusedSegmentsKiller implements OverlordDuty
+{
+ private static final EmittingLogger log = new
EmittingLogger(UnusedSegmentsKiller.class);
+
+ private static final String TASK_ID_PREFIX = "overlord-issued";
+
+ private static final int INITIAL_KILL_QUEUE_SIZE = 1000;
+ private static final int MAX_INTERVALS_TO_KILL_IN_DATASOURCE = 10_000;
+ private static final int MAX_SEGMENTS_TO_KILL_IN_INTERVAL = 1000;
+
+ /**
+ * Period after which the queue is reset even if there are existing jobs in
queue.
+ */
+ private static final Duration QUEUE_RESET_PERIOD = Duration.standardDays(1);
+
+ /**
+ * Duration for which a kill task is allowed to run.
+ */
+ private static final Duration MAX_TASK_DURATION =
Duration.standardMinutes(10);
+
+ private final ServiceEmitter emitter;
+ private final TaskLockbox taskLockbox;
+ private final DruidLeaderSelector leaderSelector;
+ private final DataSegmentKiller dataSegmentKiller;
+
+ private final UnusedSegmentKillerConfig killConfig;
+ private final TaskActionClientFactory taskActionClientFactory;
+ private final IndexerMetadataStorageCoordinator storageCoordinator;
+
+ /**
+ * Single-threaded executor to process kill jobs.
+ */
+ private final ScheduledExecutorService exec;
+ private int previousLeaderTerm;
+ private final AtomicReference<DateTime> lastResetTime = new
AtomicReference<>(null);
+
+ private final AtomicReference<TaskInfo> currentTaskInfo = new
AtomicReference<>(null);
+
+ /**
+ * Queue of kill candidates. Use a PriorityBlockingQueue to ensure
thread-safety
+ * since this queue is accessed by both {@link #run()} and {@link
#startNextJobInKillQueue}.
+ */
+ private final PriorityBlockingQueue<KillCandidate> killQueue;
+
+ @Inject
+ public UnusedSegmentsKiller(
+ SegmentsMetadataManagerConfig config,
+ TaskActionClientFactory taskActionClientFactory,
+ IndexerMetadataStorageCoordinator storageCoordinator,
+ @IndexingService DruidLeaderSelector leaderSelector,
+ ScheduledExecutorFactory executorFactory,
+ DataSegmentKiller dataSegmentKiller,
+ TaskLockbox taskLockbox,
+ ServiceEmitter emitter
+ )
+ {
+ this.emitter = emitter;
+ this.taskLockbox = taskLockbox;
+ this.leaderSelector = leaderSelector;
+ this.dataSegmentKiller = dataSegmentKiller;
+ this.storageCoordinator = storageCoordinator;
+ this.taskActionClientFactory = taskActionClientFactory;
+
+ this.killConfig = config.getKillUnused();
+
+ if (isEnabled()) {
+ this.exec = executorFactory.create(1, "UnusedSegmentsKiller-%s");
+ this.killQueue = new PriorityBlockingQueue<>(
+ INITIAL_KILL_QUEUE_SIZE,
+ Ordering.from(Comparators.intervalsByEndThenStart())
+ .onResultOf(candidate -> candidate.interval)
+ );
+ } else {
+ this.exec = null;
+ this.killQueue = null;
+ }
+ }
+
+ @Override
+ public boolean isEnabled()
+ {
+ return killConfig.isEnabled();
+ }
+
+ /**
+ * Ensures that things are moving along and the kill queue is not stuck.
+ * Updates the state if leadership changes or if the queue needs to be reset.
+ */
+ @Override
+ public void run()
+ {
+ if (!isEnabled()) {
+ return;
+ }
+
+ updateStateIfNewLeader();
+ if (shouldRebuildKillQueue()) {
+ // Clear the killQueue to stop further processing of already queued jobs
+ killQueue.clear();
+ exec.submit(() -> {
+ rebuildKillQueue();
+ startNextJobInKillQueue();
+ });
+ }
+
+ // Cancel the current task if it has been running for too long
+ final TaskInfo taskInfo = currentTaskInfo.get();
+ if (taskInfo != null && !taskInfo.future.isDone()
+ && taskInfo.sinceTaskStarted.hasElapsed(MAX_TASK_DURATION)) {
+ log.warn(
+ "Cancelling kill task[%s] as it has been running for [%,d] millis.",
+ taskInfo.taskId, taskInfo.sinceTaskStarted.millisElapsed()
+ );
+ taskInfo.future.cancel(true);
+ }
+ }
+
+ @Override
+ public DutySchedule getSchedule()
+ {
+ if (isEnabled()) {
+ // Check every hour that the kill queue is being processed normally
+ log.info("Scheduling is enabled to launch embedded kill tasks.");
+ return new DutySchedule(Duration.standardHours(1).getMillis(),
Duration.standardMinutes(1).getMillis());
+ } else {
+ return new DutySchedule(0, 0);
+ }
+ }
+
+ private void updateStateIfNewLeader()
+ {
+ final int currentLeaderTerm = leaderSelector.localTerm();
+ if (currentLeaderTerm != previousLeaderTerm) {
+ previousLeaderTerm = currentLeaderTerm;
+ killQueue.clear();
+ lastResetTime.set(null);
+ }
+ }
+
+ /**
+ * Returns true if the kill queue is empty or if the queue has not been reset
+ * yet or if {@code (lastResetTime + resetPeriod) < (now + 1)}.
+ */
+ private boolean shouldRebuildKillQueue()
+ {
+ final DateTime now = DateTimes.nowUtc().plus(1);
+
+ return killQueue.isEmpty()
+ || lastResetTime.get() == null
+ || lastResetTime.get().plus(QUEUE_RESET_PERIOD).isBefore(now);
+ }
+
+ /**
+ * Clears the kill queue and adds fresh jobs.
+ * This method need not handle race conditions as it is always run on
+ * {@link #exec} which is single-threaded.
+ */
+ private void rebuildKillQueue()
+ {
+ final Stopwatch resetDuration = Stopwatch.createStarted();
+ try {
+ killQueue.clear();
+ if (!leaderSelector.isLeader()) {
+ log.info("Not rebuilding kill queue as we are not leader anymore.");
+ return;
+ }
+
+ final Set<String> dataSources =
storageCoordinator.retrieveAllDatasourceNames();
+
+ final Map<String, Integer> dataSourceToIntervalCounts = new HashMap<>();
+ for (String dataSource : dataSources) {
+ storageCoordinator.retrieveUnusedSegmentIntervals(dataSource,
MAX_INTERVALS_TO_KILL_IN_DATASOURCE).forEach(
+ interval -> {
+ dataSourceToIntervalCounts.merge(dataSource, 1, Integer::sum);
+ killQueue.offer(new KillCandidate(dataSource, interval));
+ }
+ );
+ }
+
+ lastResetTime.set(DateTimes.nowUtc());
+ log.info(
+ "Queued [%d] kill jobs for [%d] datasources in [%d] millis.",
+ killQueue.size(), dataSources.size(), resetDuration.millisElapsed()
+ );
+ dataSourceToIntervalCounts.forEach(
+ (dataSource, intervalCount) -> emitMetric(
+ Metric.UNUSED_SEGMENT_INTERVALS,
+ intervalCount,
+ Map.of(DruidMetrics.DATASOURCE, dataSource)
+ )
+ );
+ emitMetric(Metric.QUEUE_RESET_TIME, resetDuration.millisElapsed(), null);
+ }
+ catch (Throwable t) {
+ log.makeAlert(t, "Error while resetting kill queue.");
+ }
+ }
+
+ /**
+ * Launches an {@link EmbeddedKillTask} on the {@link #exec} for the next
+ * {@link KillCandidate} in the {@link #killQueue}. This method returns
+ * immediately.
+ */
+ private void startNextJobInKillQueue()
+ {
+ if (!isEnabled() || !leaderSelector.isLeader()) {
+ return;
+ }
+
+ if (killQueue.isEmpty()) {
+ // If the last entry has been processed, emit the total processing time
and exit
+ final DateTime lastQueueResetTime = lastResetTime.get();
+ if (lastQueueResetTime != null) {
+ long processTimeMillis = DateTimes.nowUtc().getMillis() -
lastQueueResetTime.getMillis();
+ emitMetric(Metric.QUEUE_PROCESS_TIME, processTimeMillis, null);
+ }
+ return;
+ }
+
+ try {
+ final KillCandidate candidate = killQueue.poll();
+ if (candidate == null) {
+ return;
+ }
+
+ final String taskId = IdUtils.newTaskId(
+ TASK_ID_PREFIX,
+ KillUnusedSegmentsTask.TYPE,
+ candidate.dataSource,
+ candidate.interval
+ );
+
+ final Future<?> taskFuture = exec.submit(() -> {
+ runKillTask(candidate, taskId);
+ startNextJobInKillQueue();
+ });
+ currentTaskInfo.set(new TaskInfo(taskId, taskFuture));
+ }
+ catch (Throwable t) {
+ log.makeAlert(t, "Error while processing kill queue.");
+ currentTaskInfo.set(null);
+ }
+ }
+
+ /**
+ * Launches an embedded kill task for the given candidate.
+ */
+ private void runKillTask(KillCandidate candidate, String taskId)
+ {
+ final Stopwatch taskRunTime = Stopwatch.createStarted();
+ final EmbeddedKillTask killTask = new EmbeddedKillTask(
+ taskId,
+ candidate,
+ DateTimes.nowUtc().minus(killConfig.getBufferPeriod())
+ );
+
+ final TaskActionClient taskActionClient =
taskActionClientFactory.create(killTask);
+ final TaskToolbox taskToolbox = KillTaskToolbox.create(taskActionClient,
dataSegmentKiller, emitter);
+
+ final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
+ IndexTaskUtils.setTaskDimensions(metricBuilder, killTask);
+
+ try {
+ taskLockbox.add(killTask);
+ final boolean isReady = killTask.isReady(taskActionClient);
+ if (!isReady) {
+ emitter.emit(metricBuilder.setMetric(Metric.SKIPPED_INTERVALS, 1L));
+ return;
+ }
+
+ final TaskStatus status = killTask.runTask(taskToolbox);
+
+ IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status);
+ emitter.emit(metricBuilder.setMetric(TaskMetrics.RUN_DURATION,
taskRunTime.millisElapsed()));
+ }
+ catch (Throwable t) {
+ log.error(t, "Embedded kill task[%s] failed.", killTask.getId());
+
+ IndexTaskUtils.setTaskStatusDimensions(metricBuilder,
TaskStatus.failure(taskId, "Unknown error"));
+ emitter.emit(metricBuilder.setMetric(TaskMetrics.RUN_DURATION,
taskRunTime.millisElapsed()));
+ }
+ finally {
+ cleanupLocksSilently(killTask);
+ emitMetric(Metric.PROCESSED_KILL_JOBS, 1L,
Map.of(DruidMetrics.DATASOURCE, candidate.dataSource));
+ }
+ }
+
+ private void cleanupLocksSilently(EmbeddedKillTask killTask)
+ {
+ try {
+ taskLockbox.remove(killTask);
+ }
+ catch (Throwable t) {
+ log.error(t, "Error while cleaning up locks for kill task[%s].",
killTask.getId());
+ }
+ }
+
+ private void emitMetric(String metricName, long value, Map<String, String>
dimensions)
+ {
+ final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder();
+ if (dimensions != null) {
+ dimensions.forEach(builder::setDimension);
+ }
+ emitter.emit(builder.setMetric(metricName, value));
+ }
+
+ /**
+ * Represents a single candidate interval that contains unused segments.
+ */
+ private static class KillCandidate
+ {
+ private final String dataSource;
+ private final Interval interval;
+
+ private KillCandidate(String dataSource, Interval interval)
+ {
+ this.dataSource = dataSource;
+ this.interval = interval;
+ }
+ }
+
+ /**
+ * Info of the currently running task.
+ */
+ private static class TaskInfo
+ {
+ private final Stopwatch sinceTaskStarted;
+ private final String taskId;
+ private final Future<?> future;
+
+ private TaskInfo(String taskId, Future<?> future)
+ {
+ this.future = future;
+ this.taskId = taskId;
+ this.sinceTaskStarted = Stopwatch.createStarted();
+ }
+ }
+
+ /**
+ * Embedded kill task. Unlike other task types, this task is not persisted
and
+ * does not run on a worker or indexer. Hence, it doesn't take up any task
slots.
+ * To ensure that locks are held very briefly over short segment intervals,
+ * this kill task processes:
+ * <ul>
+ * <li>only 1 unused segment interval</li>
+ * <li>only 1 batch of upto 1000 unused segments</li>
+ * </ul>
+ */
+ private class EmbeddedKillTask extends KillUnusedSegmentsTask
+ {
+ private EmbeddedKillTask(
+ String taskId,
+ KillCandidate candidate,
+ DateTime maxUpdatedTimeOfEligibleSegment
+ )
+ {
+ super(
+ taskId,
+ candidate.dataSource,
+ candidate.interval,
+ null,
+ Map.of(Tasks.PRIORITY_KEY,
Tasks.DEFAULT_EMBEDDED_KILL_TASK_PRIORITY),
+ null,
+ null,
+ maxUpdatedTimeOfEligibleSegment
+ );
+ }
+
+ @Nullable
+ @Override
+ protected Integer getNumTotalBatches()
+ {
+ // Do everything in a single batch so that locks are not held for very
long
+ return 1;
+ }
+
+ @Override
+ protected List<DataSegment> fetchNextBatchOfUnusedSegments(TaskToolbox
toolbox, int nextBatchSize)
+ {
+ // Kill only 1000 segments in the batch so that locks are not held for
very long
+ return storageCoordinator.retrieveUnusedSegmentsWithExactInterval(
+ getDataSource(),
+ getInterval(),
+ getMaxUsedStatusLastUpdatedTime(),
+ MAX_SEGMENTS_TO_KILL_IN_INTERVAL
+ );
+ }
+
+ @Override
+ protected void logInfo(String message, Object... args)
+ {
+ // Reduce the level of embedded task info logs to reduce noise on the
Overlord
+ log.debug(message, args);
+ }
+ }
+
+ public static class Metric
+ {
+ public static final String QUEUE_RESET_TIME =
"segment/kill/queueReset/time";
+ public static final String QUEUE_PROCESS_TIME =
"segment/kill/queueProcess/time";
+ public static final String PROCESSED_KILL_JOBS =
"segment/kill/jobsProcessed/count";
+
+ public static final String SKIPPED_INTERVALS =
"segment/kill/skippedIntervals/count";
+ public static final String UNUSED_SEGMENT_INTERVALS =
"segment/kill/unusedIntervals/count";
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
index 3a63495928a..2ddd0632291 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
@@ -30,6 +30,8 @@ import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
@@ -44,7 +46,6 @@ import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import
org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
-import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.rules.ExternalResource;
@@ -55,6 +56,7 @@ public class TaskActionTestKit extends ExternalResource
private TaskStorage taskStorage;
private TaskLockbox taskLockbox;
+ private StubServiceEmitter emitter;
private TestDerbyConnector testDerbyConnector;
private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
private TaskActionToolbox taskActionToolbox;
@@ -64,6 +66,21 @@ public class TaskActionTestKit extends ExternalResource
private boolean useSegmentMetadataCache = false;
private boolean skipSegmentPayloadFetchForAllocation = new
TaskLockConfig().isBatchAllocationReduceMetadataIO();
+ public StubServiceEmitter getServiceEmitter()
+ {
+ return emitter;
+ }
+
+ public TestDerbyConnector getTestDerbyConnector()
+ {
+ return testDerbyConnector;
+ }
+
+ public MetadataStorageTablesConfig getMetadataStorageTablesConfig()
+ {
+ return metadataStorageTablesConfig;
+ }
+
public TaskLockbox getTaskLockbox()
{
return taskLockbox;
@@ -97,6 +114,7 @@ public class TaskActionTestKit extends ExternalResource
@Override
public void before()
{
+ emitter = new StubServiceEmitter();
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new
Period("PT24H")));
testDerbyConnector = new TestDerbyConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
@@ -109,7 +127,7 @@ public class TaskActionTestKit extends ExternalResource
testDerbyConnector
);
- final SqlSegmentMetadataTransactionFactory transactionFactory =
setupTransactionFactory(objectMapper);
+ final SqlSegmentMetadataTransactionFactory transactionFactory =
setupTransactionFactory(objectMapper, emitter);
metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
transactionFactory,
objectMapper,
@@ -142,10 +160,10 @@ public class TaskActionTestKit extends ExternalResource
taskLockbox,
taskLockConfig,
metadataStorageCoordinator,
- NoopServiceEmitter.instance(),
+ emitter,
ScheduledExecutors::fixed
),
- NoopServiceEmitter.instance(),
+ emitter,
EasyMock.createMock(SupervisorManager.class),
objectMapper
);
@@ -163,7 +181,10 @@ public class TaskActionTestKit extends ExternalResource
syncSegmentMetadataCache();
}
- private SqlSegmentMetadataTransactionFactory
setupTransactionFactory(ObjectMapper objectMapper)
+ private SqlSegmentMetadataTransactionFactory setupTransactionFactory(
+ ObjectMapper objectMapper,
+ ServiceEmitter emitter
+ )
{
metadataCachePollExec = new
BlockingExecutorService("test-cache-poll-exec");
SegmentMetadataCache.UsageMode cacheMode
@@ -173,12 +194,12 @@ public class TaskActionTestKit extends ExternalResource
segmentMetadataCache = new HeapMemorySegmentMetadataCache(
objectMapper,
- Suppliers.ofInstance(new
SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode)),
+ Suppliers.ofInstance(new
SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode, null)),
Suppliers.ofInstance(metadataStorageTablesConfig),
new NoopSegmentSchemaCache(),
testDerbyConnector,
(poolSize, name) -> new WrappingScheduledExecutorService(name,
metadataCachePollExec, false),
- NoopServiceEmitter.instance()
+ emitter
);
final TestDruidLeaderSelector leaderSelector = new
TestDruidLeaderSelector();
@@ -190,7 +211,7 @@ public class TaskActionTestKit extends ExternalResource
testDerbyConnector,
leaderSelector,
segmentMetadataCache,
- NoopServiceEmitter.instance()
+ emitter
)
{
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 848a2bb91ce..9972340ae0c 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -181,7 +181,7 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
segmentMetadataCache,
segmentSchemaCache,
derbyConnectorRule.getConnector(),
- () -> new SegmentsMetadataManagerConfig(null, null),
+ () -> new SegmentsMetadataManagerConfig(null, null, null),
derbyConnectorRule.metadataTablesConfigSupplier(),
CentralizedDatasourceSchemaConfig::create,
NoopServiceEmitter.instance(),
@@ -323,7 +323,7 @@ public abstract class IngestionTestBase extends
InitializedNullHandlingTest
: SegmentMetadataCache.UsageMode.NEVER;
segmentMetadataCache = new HeapMemorySegmentMetadataCache(
objectMapper,
- Suppliers.ofInstance(new
SegmentsMetadataManagerConfig(Period.millis(10), cacheMode)),
+ Suppliers.ofInstance(new
SegmentsMetadataManagerConfig(Period.millis(10), cacheMode, null)),
derbyConnectorRule.metadataTablesConfigSupplier(),
segmentSchemaCache,
derbyConnectorRule.getConnector(),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java
index 00169b0a955..bc6be505c62 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java
@@ -36,11 +36,11 @@ public class OverlordDutyExecutorTest
{
OverlordDuty testDuty1 = Mockito.mock(OverlordDuty.class);
Mockito.when(testDuty1.isEnabled()).thenReturn(true);
- Mockito.when(testDuty1.getSchedule()).thenReturn(new DutySchedule(0, 0));
+ Mockito.when(testDuty1.getSchedule()).thenReturn(new DutySchedule(1, 0));
OverlordDuty testDuty2 = Mockito.mock(OverlordDuty.class);
Mockito.when(testDuty2.isEnabled()).thenReturn(true);
- Mockito.when(testDuty2.getSchedule()).thenReturn(new DutySchedule(0, 0));
+ Mockito.when(testDuty2.getSchedule()).thenReturn(new DutySchedule(1, 0));
ScheduledExecutorFactory executorFactory =
Mockito.mock(ScheduledExecutorFactory.class);
ScheduledExecutorService executorService =
Mockito.mock(ScheduledExecutorService.class);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
new file mode 100644
index 00000000000..64a551fdee4
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.actions.LocalTaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionTestKit;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskMetrics;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.indexing.overlord.TimeChunkLockRequest;
+import org.apache.druid.indexing.test.TestDataSegmentKiller;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.UnusedSegmentKillerConfig;
+import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.TestDataSource;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
+import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
+import
org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class UnusedSegmentsKillerTest
+{
+ @Rule
+ public TaskActionTestKit taskActionTestKit = new TaskActionTestKit();
+
+ private static final List<DataSegment> WIKI_SEGMENTS_1X10D =
+ CreateDataSegments.ofDatasource(TestDataSource.WIKI)
+ .forIntervals(10, Granularities.DAY)
+ .eachOfSize(500);
+
+ private StubServiceEmitter emitter;
+ private UnusedSegmentsKiller killer;
+ private BlockingExecutorService killExecutor;
+ private UnusedSegmentKillerConfig killerConfig;
+ private TestDruidLeaderSelector leaderSelector;
+ private TestDataSegmentKiller dataSegmentKiller;
+ private IndexerMetadataStorageCoordinator storageCoordinator;
+
+ @Before
+ public void setup()
+ {
+ emitter = taskActionTestKit.getServiceEmitter();
+ leaderSelector = new TestDruidLeaderSelector();
+ dataSegmentKiller = new TestDataSegmentKiller();
+ killerConfig = new UnusedSegmentKillerConfig(true, Period.ZERO);
+ killExecutor = new BlockingExecutorService("UnusedSegmentsKillerTest-%s");
+ storageCoordinator = taskActionTestKit.getMetadataStorageCoordinator();
+ initKiller();
+ }
+
+ private void initKiller()
+ {
+ killer = new UnusedSegmentsKiller(
+ new SegmentsMetadataManagerConfig(
+ null,
+ SegmentMetadataCache.UsageMode.ALWAYS,
+ killerConfig
+ ),
+ task -> new LocalTaskActionClient(task,
taskActionTestKit.getTaskActionToolbox()),
+ storageCoordinator,
+ leaderSelector,
+ (corePoolSize, nameFormat) -> new
WrappingScheduledExecutorService(nameFormat, killExecutor, true),
+ dataSegmentKiller,
+ taskActionTestKit.getTaskLockbox(),
+ taskActionTestKit.getServiceEmitter()
+ );
+ }
+
+ private void finishQueuedKillJobs()
+ {
+ killExecutor.finishAllPendingTasks();
+ }
+
+ @Test
+ public void test_getSchedule_returnsOneHourPeriod_ifEnabled()
+ {
+ final DutySchedule schedule = killer.getSchedule();
+ Assert.assertEquals(Duration.standardHours(1).getMillis(),
schedule.getPeriodMillis());
+ Assert.assertEquals(Duration.standardMinutes(1).getMillis(),
schedule.getInitialDelayMillis());
+ }
+
+ @Test
+ public void test_getSchedule_returnsZeroPeriod_ifDisabled()
+ {
+ killerConfig = new UnusedSegmentKillerConfig(false, null);
+ initKiller();
+
+ final DutySchedule schedule = killer.getSchedule();
+ Assert.assertEquals(0, schedule.getPeriodMillis());
+ Assert.assertEquals(0, schedule.getInitialDelayMillis());
+ }
+
+ @Test
+ public void test_run_startsProcessing_ifEnabled()
+ {
+ Assert.assertFalse(killExecutor.hasPendingTasks());
+ Assert.assertTrue(killer.isEnabled());
+
+ killer.run();
+ Assert.assertTrue(killExecutor.hasPendingTasks());
+ }
+
+ @Test
+ public void test_run_isNoop_ifDisabled()
+ {
+ killerConfig = new UnusedSegmentKillerConfig(false, null);
+ initKiller();
+
+ Assert.assertFalse(killer.isEnabled());
+
+ killer.run();
+ Assert.assertFalse(killExecutor.hasPendingTasks());
+ }
+
+ @Test
+ public void test_run_doesNotProcessSegments_ifNotLeader()
+ {
+ storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null);
+ storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+ leaderSelector.becomeLeader();
+ killer.run();
+
+ leaderSelector.stopBeingLeader();
+
+ Assert.assertTrue(killExecutor.hasPendingTasks());
+
+ finishQueuedKillJobs();
+ emitter.verifyNotEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS);
+ Assert.assertFalse(killExecutor.hasPendingTasks());
+ }
+
+ @Test
+ public void test_run_launchesEmbeddedKillTasks_ifLeader()
+ {
+ leaderSelector.becomeLeader();
+
+ storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null);
+ storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+ // Reset the queue and verify that kill jobs have been added to the queue
+ killer.run();
+ Assert.assertTrue(killExecutor.hasPendingTasks());
+ emitter.verifyNotEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS);
+
+ finishQueuedKillJobs();
+ emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10);
+
+ emitter.verifyEmitted(UnusedSegmentsKiller.Metric.QUEUE_PROCESS_TIME, 1);
+ emitter.verifyEmitted(TaskMetrics.RUN_DURATION, 10);
+
+ emitter.verifyEmitted(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE,
10);
+ emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 10L);
+
+ emitter.verifyEmitted(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 10);
+ emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 10L);
+
+ Assert.assertTrue(
+ retrieveUnusedSegments(Intervals.ETERNITY).isEmpty()
+ );
+ }
+
+ @Test
+ public void test_maxSegmentsKilledInAnInterval_is_1k()
+ {
+ leaderSelector.becomeLeader();
+
+ final List<DataSegment> segments =
+ CreateDataSegments.ofDatasource(TestDataSource.WIKI)
+ .forIntervals(1, Granularities.DAY)
+ .withNumPartitions(2000)
+ .eachOfSizeInMb(50);
+
+ storageCoordinator.commitSegments(Set.copyOf(segments), null);
+ storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+ Assert.assertEquals(
+ 2000,
+ retrieveUnusedSegments(segments.get(0).getInterval()).size()
+ );
+
+ // Reset the kill queue and execute kill tasks
+ killer.run();
+ finishQueuedKillJobs();
+
+ // Verify that a single kill task has run which killed 1k segments
+ emitter.verifyEmitted(TaskMetrics.RUN_DURATION, 1);
+ emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 1000L);
+
+ Assert.assertEquals(
+ 1000,
+ retrieveUnusedSegments(segments.get(0).getInterval()).size()
+ );
+ }
+
+ @Test(timeout = 20_000L)
+ public void test_maxIntervalsKilledInADatasource_is_10k()
+ {
+ leaderSelector.becomeLeader();
+
+ final List<DataSegment> segments =
+ CreateDataSegments.ofDatasource(TestDataSource.WIKI)
+ .forIntervals(20_000, Granularities.DAY)
+ .eachOfSizeInMb(50);
+
+ storageCoordinator.commitSegments(Set.copyOf(segments), null);
+ storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+ Assert.assertEquals(
+ 20_000,
+ retrieveUnusedSegments(Intervals.ETERNITY).size()
+ );
+
+ // Reset the kill queue and execute kill tasks
+ killer.run();
+ finishQueuedKillJobs();
+
+ // Verify that 10k kill tasks have run, each killing a single segment
+ emitter.verifyEmitted(TaskMetrics.RUN_DURATION, 10000);
+ emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE,
10_000L);
+
+ Assert.assertEquals(
+ 10_000,
+ retrieveUnusedSegments(Intervals.ETERNITY).size()
+ );
+ }
+
+ @Test
+ public void test_run_resetsQueue_ifLeadershipIsReacquired()
+ {
+ leaderSelector.becomeLeader();
+
+ // Verify that the queue has been reset
+ killer.run();
+ finishQueuedKillJobs();
+ emitter.verifyEmitted(UnusedSegmentsKiller.Metric.QUEUE_RESET_TIME, 1);
+ emitter.verifyNotEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS);
+
+ storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null);
+ storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+ // Lose and reacquire leadership
+ leaderSelector.stopBeingLeader();
+ leaderSelector.becomeLeader();
+
+ // Run again and verify that queue has been reset
+ emitter.flush();
+ killer.run();
+ finishQueuedKillJobs();
+ emitter.verifyEmitted(UnusedSegmentsKiller.Metric.QUEUE_RESET_TIME, 1);
+ emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10);
+ }
+
+ @Test
+ public void
test_run_doesNotResetQueue_ifThereArePendingJobs_andLastRunWasLessThanOneDayAgo()
+ {
+ leaderSelector.becomeLeader();
+
+ storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null);
+ storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+ // Invoke run, reset the queue and process only some of the jobs
+ killer.run();
+ killExecutor.finishNextPendingTasks(6);
+ emitter.verifyEmitted(UnusedSegmentsKiller.Metric.QUEUE_RESET_TIME, 1);
+ emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 5);
+
+ Assert.assertTrue(killExecutor.hasPendingTasks());
+
+ // Invoke run again and verify that queue has not been reset
+ emitter.flush();
+ killer.run();
+ finishQueuedKillJobs();
+ emitter.verifyNotEmitted(UnusedSegmentsKiller.Metric.QUEUE_RESET_TIME);
+ emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 5);
+
+ // All jobs have been processed
+ Assert.assertFalse(killExecutor.hasPendingTasks());
+ }
+
+ @Test
+ public void test_run_prioritizesOlderIntervals()
+ {
+ leaderSelector.becomeLeader();
+
+ storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null);
+ storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+ killer.run();
+ finishQueuedKillJobs();
+ emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10);
+
+ // Verify that the kill intervals are sorted with the oldest interval first
+ final List<StubServiceEmitter.ServiceMetricEventSnapshot> events =
+ emitter.getMetricEvents().get(TaskMetrics.RUN_DURATION);
+ final List<Interval> killIntervals = events.stream().map(event -> {
+ final String taskId = (String)
event.getUserDims().get(DruidMetrics.TASK_ID);
+ String[] splits = taskId.split("_");
+ return Intervals.of(splits[4] + "/" + splits[5]);
+ }).collect(Collectors.toList());
+
+ Assert.assertEquals(10, killIntervals.size());
+
+ final List<Interval> expectedIntervals =
+ WIKI_SEGMENTS_1X10D.stream()
+ .map(DataSegment::getInterval)
+ .sorted(Comparators.intervalsByEndThenStart())
+ .collect(Collectors.toList());
+ Assert.assertEquals(expectedIntervals, killIntervals);
+ }
+
+ @Test
+ public void
test_run_doesNotDeleteSegmentFiles_ifLoadSpecIsUsedByAnotherSegment()
+ {
+ storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null);
+ storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+ // Add a new segment upgraded from one of the unused segments
+ final DataSegment upgradedSegment1 =
WIKI_SEGMENTS_1X10D.get(0).withVersion("v2");
+ final DataSegment upgradedSegment2 =
WIKI_SEGMENTS_1X10D.get(1).withVersion("v2");
+ storageCoordinator.commitSegments(Set.of(upgradedSegment1,
upgradedSegment2), null);
+
+ leaderSelector.becomeLeader();
+ killer.run();
+
+ // Verify that all unused segments are deleted from metadata store but the
+ // ones with used load specs are not deleted from the deep store
+ finishQueuedKillJobs();
+ emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 10L);
+ emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 8L);
+ }
+
+ @Test
+ public void test_run_doesNotKillSegment_ifUpdatedWithinBufferPeriod()
+ {
+ killerConfig = new UnusedSegmentKillerConfig(true, Period.hours(1));
+ initKiller();
+
+ storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null);
+ storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+ leaderSelector.becomeLeader();
+ killer.run();
+ finishQueuedKillJobs();
+
+ // Verify that tasks are launched but no segment is killed
+ emitter.verifyValue(UnusedSegmentsKiller.Metric.UNUSED_SEGMENT_INTERVALS,
10L);
+ emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10);
+ emitter.verifyEmitted(TaskMetrics.RUN_DURATION, 10);
+
+ emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 0L);
+ emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 0L);
+ }
+
+ @Test
+ public void test_run_killsFutureSegment()
+ {
+ final List<DataSegment> futureSegments = CreateDataSegments
+ .ofDatasource(TestDataSource.WIKI)
+ .forIntervals(10, Granularities.DAY)
+ .startingAt("2050-01-01")
+ .eachOfSize(500);
+ storageCoordinator.commitSegments(Set.copyOf(futureSegments), null);
+ storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+ leaderSelector.becomeLeader();
+ killer.run();
+ finishQueuedKillJobs();
+
+ emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 10L);
+ emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 10L);
+ }
+
+ @Test
+ public void test_run_skipsLockedIntervals() throws InterruptedException
+ {
+ storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null);
+ storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+ // Lock up some of the segment intervals
+ final Interval lockedInterval = new Interval(
+ WIKI_SEGMENTS_1X10D.get(0).getInterval().getStart(),
+ WIKI_SEGMENTS_1X10D.get(4).getInterval().getEnd()
+ );
+
+ final Task ingestionTask = new NoopTask(null, null, TestDataSource.WIKI,
0L, 0L, null);
+ final TaskLockbox taskLockbox = taskActionTestKit.getTaskLockbox();
+
+ try {
+ taskLockbox.add(ingestionTask);
+ taskLockbox.lock(
+ ingestionTask,
+ new TimeChunkLockRequest(TaskLockType.APPEND, ingestionTask,
lockedInterval, null)
+ );
+
+ leaderSelector.becomeLeader();
+ killer.run();
+ finishQueuedKillJobs();
+
+ // Verify that unused segments from locked intervals are not killed
+ emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 5L);
+ emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 5L);
+ emitter.verifySum(UnusedSegmentsKiller.Metric.SKIPPED_INTERVALS, 5L);
+ }
+ finally {
+ taskLockbox.remove(ingestionTask);
+ }
+
+ // Do another run to clean up the rest of the segments
+ killer.run();
+ finishQueuedKillJobs();
+ emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 10L);
+ emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 10L);
+ }
+
+ private List<DataSegment> retrieveUnusedSegments(Interval interval)
+ {
+ return storageCoordinator.retrieveUnusedSegmentsForInterval(
+ TestDataSource.WIKI,
+ interval,
+ null,
+ null
+ );
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index bd8d6481ba6..d0e639db2c2 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -56,6 +56,29 @@ public class TestIndexerMetadataStorageCoordinator
implements IndexerMetadataSto
private int deleteSegmentsCount = 0;
+ @Override
+ public Set<String> retrieveAllDatasourceNames()
+ {
+ return Set.of();
+ }
+
+ @Override
+ public List<Interval> retrieveUnusedSegmentIntervals(String dataSource, int
limit)
+ {
+ return List.of();
+ }
+
+ @Override
+ public List<DataSegment> retrieveUnusedSegmentsWithExactInterval(
+ String dataSource,
+ Interval interval,
+ DateTime maxUpdatedTime,
+ int limit
+ )
+ {
+ return List.of();
+ }
+
@Override
public DataSourceMetadata retrieveDataSourceMetadata(String dataSource)
{
@@ -80,12 +103,6 @@ public class TestIndexerMetadataStorageCoordinator
implements IndexerMetadataSto
return false;
}
- @Override
- public Set<String> retrieveAllDatasourceNames()
- {
- return Set.of();
- }
-
@Override
public Set<DataSegment> retrieveAllUsedSegments(String dataSource, Segments
visibility)
{
@@ -286,10 +303,11 @@ public class TestIndexerMetadataStorageCoordinator
implements IndexerMetadataSto
}
@Override
- public void deleteSegments(Set<DataSegment> segments)
+ public int deleteSegments(Set<DataSegment> segments)
{
deleteSegmentsCount++;
nuked.addAll(segments);
+ return segments.size();
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java
b/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java
index a877d2bf7b1..750f9090c26 100644
---
a/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java
+++
b/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java
@@ -56,7 +56,7 @@ public class KillTaskReport implements TaskReport
@Override
@JsonProperty
- public Object getPayload()
+ public Stats getPayload()
{
return stats;
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
b/processing/src/main/java/org/apache/druid/segment/loading/SegmentKillResult.java
similarity index 52%
copy from
server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
copy to
processing/src/main/java/org/apache/druid/segment/loading/SegmentKillResult.java
index cb02f7c8f38..ff3c3b8a98b 100644
---
a/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
+++
b/processing/src/main/java/org/apache/druid/segment/loading/SegmentKillResult.java
@@ -17,35 +17,33 @@
* under the License.
*/
-package org.apache.druid.metadata.segment;
+package org.apache.druid.segment.loading;
-import org.skife.jdbi.v2.Handle;
-
-import java.io.Closeable;
+import java.util.List;
/**
- * Represents a single transaction involving read of segment metadata into
- * the metadata store. A transaction is associated with a single instance of a
- * {@link Handle} and is meant to be short-lived.
+ * Result of killing data segments using {@link DataSegmentKiller}.
*/
-public interface SegmentMetadataReadTransaction
- extends DatasourceSegmentMetadataReader, Closeable
+public class SegmentKillResult
{
- /**
- * @return The JDBI handle used in this transaction
- */
- Handle getHandle();
+ private static final SegmentKillResult EMPTY_INSTANCE = new
SegmentKillResult(List.of());
+
+ public static SegmentKillResult empty()
+ {
+ return EMPTY_INSTANCE;
+ }
- /**
- * Completes the transaction by either committing it or rolling it back.
- */
- @Override
- void close();
+ private final List<String> deletedPaths;
- @FunctionalInterface
- interface Callback<T>
+ public SegmentKillResult(
+ List<String> deletedPaths
+ )
{
- T inTransaction(SegmentMetadataReadTransaction transaction) throws
Exception;
+ this.deletedPaths = deletedPaths;
}
+ public List<String> getDeletedPaths()
+ {
+ return deletedPaths;
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/metrics/MetricsVerifier.java
b/processing/src/test/java/org/apache/druid/java/util/metrics/MetricsVerifier.java
index 98e538e3b06..5ce60dcfaeb 100644
---
a/processing/src/test/java/org/apache/druid/java/util/metrics/MetricsVerifier.java
+++
b/processing/src/test/java/org/apache/druid/java/util/metrics/MetricsVerifier.java
@@ -22,6 +22,7 @@ package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.Assert;
+import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
@@ -50,7 +51,7 @@ public interface MetricsVerifier
* Verifies that the metric was emitted for the given dimension filters the
* expected number of times.
*/
- default void verifyEmitted(String metricName, Map<String, Object>
dimensionFilters, int times)
+ default void verifyEmitted(String metricName, @Nullable Map<String, Object>
dimensionFilters, int times)
{
Assert.assertEquals(
StringUtils.format("Metric [%s] was emitted unexpected number of
times.", metricName),
@@ -71,7 +72,7 @@ public interface MetricsVerifier
* Verifies the value of the event corresponding to the specified metric and
* dimensionFilters emitted in the previous run.
*/
- default void verifyValue(String metricName, Map<String, Object>
dimensionFilters, Number expectedValue)
+ default void verifyValue(String metricName, @Nullable Map<String, Object>
dimensionFilters, Number expectedValue)
{
Assert.assertEquals(expectedValue, getValue(metricName, dimensionFilters));
}
@@ -80,7 +81,7 @@ public interface MetricsVerifier
* Gets the value of the event corresponding to the specified metric and
* dimensionFilters.
*/
- default Number getValue(String metricName, Map<String, Object>
dimensionFilters)
+ default Number getValue(String metricName, @Nullable Map<String, Object>
dimensionFilters)
{
List<Number> values = getMetricValues(metricName, dimensionFilters);
Assert.assertEquals(
@@ -91,9 +92,36 @@ public interface MetricsVerifier
return values.get(0);
}
+ /**
+ * Gets the sum of values of all events corresponding to the specified
metric.
+ */
+ default void verifySum(String metricName, long expectedSum)
+ {
+ verifySum(metricName, null, expectedSum);
+ }
+
+ /**
+ * Gets the sum of values of all events corresponding to the specified metric
+ * and dimensionFilters.
+ */
+ default void verifySum(String metricName, @Nullable Map<String, Object>
dimensionFilters, long expectedSum)
+ {
+ long observedSum = getMetricValues(metricName, dimensionFilters)
+ .stream()
+ .mapToLong(Number::longValue)
+ .sum();
+ Assert.assertEquals(
+ StringUtils.format(
+ "Unexpected sum[%s] of metric[%s] with filters[%s]",
+ observedSum, metricName, dimensionFilters
+ ),
+ expectedSum, observedSum
+ );
+ }
+
/**
* Gets the metric values for the specified dimension filters.
*/
- List<Number> getMetricValues(String metricName, Map<String, Object>
dimensionFilters);
+ List<Number> getMetricValues(String metricName, @Nullable Map<String,
Object> dimensionFilters);
}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index db62686cd59..13da5aa3ac8 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -150,6 +150,26 @@ public interface IndexerMetadataStorageCoordinator
@Nullable DateTime maxUsedStatusLastUpdatedTime
);
+ /**
+ * Retrieves unused segments from the metadata store that match the given
+ * interval exactly. There is no guarantee on the order of segments in the
list
+ * or on whether the limited list contains the highest or lowest segment IDs
+ * in the interval.
+ *
+ * @param interval Returned segments must exactly match this interval.
+ * @param maxUpdatedTime Returned segments must have a {@code
used_status_last_updated}
+ * which is either null or earlier than this value.
+ * @param limit Maximum number of segments to return.
+ *
+ * @return Unsorted list of unused segments that match the given parameters.
+ */
+ List<DataSegment> retrieveUnusedSegmentsWithExactInterval(
+ String dataSource,
+ Interval interval,
+ DateTime maxUpdatedTime,
+ int limit
+ );
+
/**
* Retrieves segments for the given IDs, regardless of their visibility
* (visible, overshadowed or unused).
@@ -456,7 +476,12 @@ public interface IndexerMetadataStorageCoordinator
void updateSegmentMetadata(Set<DataSegment> segments);
- void deleteSegments(Set<DataSegment> segments);
+ /**
+ * Deletes unused segments from the metadata store.
+ *
+ * @return Number of segments actually deleted.
+ */
+ int deleteSegments(Set<DataSegment> segments);
/**
* Retrieve the segment for a given id from the metadata store. Return null
if no such segment exists
@@ -556,6 +581,15 @@ public interface IndexerMetadataStorageCoordinator
DateTime maxUsedStatusLastUpdatedTime
);
+ /**
+ * Retrieves intervals of the specified datasource that contain any unused
segments.
+ * There is no guarantee on the order of intervals in the list or on whether
+ * the limited list contains the earliest or latest intervals of the
datasource.
+ *
+ * @return Unsorted list of unused segment intervals containing upto {@code
limit} entries.
+ */
+ List<Interval> retrieveUnusedSegmentIntervals(String dataSource, int limit);
+
/**
* Returns the number of segment entries in the database whose state was
changed as the result of this call (that is,
* the segments were marked as used). If the call results in a database
error, an exception is relayed to the caller.
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 8ede92dc9c5..722b3a8155e 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -48,7 +48,6 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.metadata.segment.DatasourceSegmentMetadataWriter;
import org.apache.druid.metadata.segment.SegmentMetadataReadTransaction;
import org.apache.druid.metadata.segment.SegmentMetadataTransaction;
import org.apache.druid.metadata.segment.SegmentMetadataTransactionFactory;
@@ -158,6 +157,14 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
);
}
+ @Override
+ public List<Interval> retrieveUnusedSegmentIntervals(String dataSource, int
limit)
+ {
+ return inReadOnlyTransaction(
+ sql -> sql.retrieveUnusedSegmentIntervals(dataSource, limit)
+ );
+ }
+
@Override
public Set<DataSegment> retrieveUsedSegmentsForIntervals(
final String dataSource,
@@ -225,7 +232,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
{
final List<DataSegment> matchingSegments = inReadOnlyDatasourceTransaction(
dataSource,
- transaction -> transaction.findUnusedSegments(
+ transaction -> transaction.noCacheSql().findUnusedSegments(
+ dataSource,
interval,
versions,
limit,
@@ -241,6 +249,24 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
return matchingSegments;
}
+ @Override
+ public List<DataSegment> retrieveUnusedSegmentsWithExactInterval(
+ String dataSource,
+ Interval interval,
+ DateTime maxUpdatedTime,
+ int limit
+ )
+ {
+ return inReadOnlyTransaction(
+ sql -> sql.retrieveUnusedSegmentsWithExactInterval(
+ dataSource,
+ interval,
+ maxUpdatedTime,
+ limit
+ )
+ );
+ }
+
@Override
public Set<DataSegment> retrieveSegmentsById(String dataSource, Set<String>
segmentIds)
{
@@ -1574,7 +1600,8 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
// If yes, try to compute allocated partition num using the max unused
segment shard spec
- SegmentId unusedMaxId = transaction.findHighestUnusedSegmentId(
+ SegmentId unusedMaxId =
transaction.noCacheSql().retrieveHighestUnusedSegmentId(
+ allocatedId.getDataSource(),
allocatedId.getInterval(),
allocatedId.getVersion()
);
@@ -1617,7 +1644,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
{
return inReadWriteDatasourceTransaction(
dataSource,
- DatasourceSegmentMetadataWriter::deleteAllPendingSegments
+ SegmentMetadataTransaction::deleteAllPendingSegments
);
}
@@ -2305,11 +2332,11 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
@Override
- public void deleteSegments(final Set<DataSegment> segments)
+ public int deleteSegments(final Set<DataSegment> segments)
{
if (segments.isEmpty()) {
log.info("No segments to delete.");
- return;
+ return 0;
}
final String dataSource = verifySegmentsToCommit(segments);
@@ -2322,7 +2349,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
);
log.debugSegments(segments, "Delete the metadata of segments");
- log.info("Deleted [%d] segments from metadata storage for dataSource
[%s].", numDeletedSegments, dataSource);
+ return numDeletedSegments;
}
@Override
diff --git
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
index f53f3108221..5718a0bdc9a 100644
---
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
+++
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.metadata;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.config.Configs;
+import org.apache.druid.error.DruidException;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.joda.time.Period;
@@ -39,14 +40,29 @@ public class SegmentsMetadataManagerConfig
@JsonProperty
private final SegmentMetadataCache.UsageMode useIncrementalCache;
+ @JsonProperty
+ private final UnusedSegmentKillerConfig killUnused;
+
@JsonCreator
public SegmentsMetadataManagerConfig(
@JsonProperty("pollDuration") Period pollDuration,
- @JsonProperty("useIncrementalCache") SegmentMetadataCache.UsageMode
useIncrementalCache
+ @JsonProperty("useIncrementalCache") SegmentMetadataCache.UsageMode
useIncrementalCache,
+ @JsonProperty("killUnused") UnusedSegmentKillerConfig killUnused
)
{
this.pollDuration = Configs.valueOrDefault(pollDuration,
Period.minutes(1));
this.useIncrementalCache = Configs.valueOrDefault(useIncrementalCache,
SegmentMetadataCache.UsageMode.NEVER);
+ this.killUnused = Configs.valueOrDefault(killUnused, new
UnusedSegmentKillerConfig(null, null));
+ if (this.killUnused.isEnabled() && this.useIncrementalCache ==
SegmentMetadataCache.UsageMode.NEVER) {
+ throw DruidException
+ .forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build(
+ "Segment metadata cache must be enabled to allow killing of
unused segments."
+ + " Set 'druid.manager.segments.useIncrementalCache=always'"
+ + " or 'druid.manager.segments.useIncrementalCache=ifSynced' to
enable the cache."
+ );
+ }
}
/**
@@ -61,4 +77,9 @@ public class SegmentsMetadataManagerConfig
{
return pollDuration;
}
+
+ public UnusedSegmentKillerConfig getKillUnused()
+ {
+ return killUnused;
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index de2e35813c4..2779c16da17 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.UnmodifiableIterator;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InternalServerError;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes;
@@ -187,8 +188,9 @@ public class SqlSegmentsMetadataQuery
}
/**
- * Determines the highest ID amongst unused segments for the given
datasource,
- * interval and version.
+ * Retrieves the ID of the unused segment that has the highest partition
+ * number amongst all unused segments that exactly match the given interval
+ * and version.
*
* @return null if no unused segment exists for the given parameters.
*/
@@ -286,6 +288,37 @@ public class SqlSegmentsMetadataQuery
}
}
+ /**
+ * Retrieves unused segments that are fully contained within the given
interval.
+ *
+ * @param interval Returned segments must be fully contained within
this
+ * interval
+ * @param versions Optional list of segment versions. If passed as
null,
+ * all segment versions are eligible.
+ * @param limit Maximum number of segments to return. If passed as
null,
+ * all segments are returned.
+ * @param maxUpdatedTime Returned segments must have a {@code
used_status_last_updated}
+ * which is either null or earlier than this value.
+ */
+ public List<DataSegment> findUnusedSegments(
+ String dataSource,
+ Interval interval,
+ @Nullable List<String> versions,
+ @Nullable Integer limit,
+ @Nullable DateTime maxUpdatedTime
+ )
+ {
+ try (
+ final CloseableIterator<DataSegment> iterator =
+ retrieveUnusedSegments(dataSource, List.of(interval), versions,
limit, null, null, maxUpdatedTime)
+ ) {
+ return ImmutableList.copyOf(iterator);
+ }
+ catch (IOException e) {
+ throw InternalServerError.exception(e, "Error while reading unused
segments");
+ }
+ }
+
/**
* Retrieves segments for a given datasource that are marked unused and that
are <b>fully contained by</b> any interval
* in a particular collection of intervals. If the collection of intervals
is empty, this method will retrieve all
@@ -1027,6 +1060,74 @@ public class SqlSegmentsMetadataQuery
return
unusedIntervals.stream().filter(Objects::nonNull).collect(Collectors.toList());
}
+ /**
+ * Gets unused segment intervals for the specified datasource. There is no
+ * guarantee on the order of intervals in the list or on whether the limited
+ * list contains the earliest or latest intervals present in the datasource.
+ *
+ * @return List of unused segment intervals containing upto {@code limit}
entries.
+ */
+ public List<Interval> retrieveUnusedSegmentIntervals(String dataSource, int
limit)
+ {
+ final String sql = StringUtils.format(
+ "SELECT start, %2$send%2$s FROM %1$s"
+ + " WHERE dataSource = :dataSource AND used = false"
+ + " GROUP BY start, %2$send%2$s"
+ + " %3$s",
+ dbTables.getSegmentsTable(), connector.getQuoteString(),
connector.limitClause(limit)
+ );
+
+ final List<Interval> intervals = connector.inReadOnlyTransaction(
+ (handle, status) ->
+ handle.createQuery(sql)
+ .setFetchSize(connector.getStreamingFetchSize())
+ .bind("dataSource", dataSource)
+ .map((index, r, ctx) -> mapToInterval(r, dataSource))
+ .list()
+ );
+
+ return
intervals.stream().filter(Objects::nonNull).collect(Collectors.toList());
+ }
+
+ /**
+ * Retrieves unused segments that exactly match the given interval.
+ *
+ * @param interval Returned segments must exactly match this interval.
+ * @param maxUpdatedTime Returned segments must have a {@code
used_status_last_updated}
+ * which is either null or earlier than this value.
+ * @param limit Maximum number of segments to return
+ */
+ public List<DataSegment> retrieveUnusedSegmentsWithExactInterval(
+ String dataSource,
+ Interval interval,
+ DateTime maxUpdatedTime,
+ int limit
+ )
+ {
+ final String sql = StringUtils.format(
+ "SELECT id, payload FROM %1$s"
+ + " WHERE dataSource = :dataSource AND used = false"
+ + " AND %2$send%2$s = :end AND start = :start"
+ + " AND (used_status_last_updated IS NULL OR used_status_last_updated
<= :maxUpdatedTime)"
+ + " %3$s",
+ dbTables.getSegmentsTable(), connector.getQuoteString(),
connector.limitClause(limit)
+ );
+
+ final List<DataSegment> segments = connector.inReadOnlyTransaction(
+ (handle, status) ->
+ handle.createQuery(sql)
+ .setFetchSize(connector.getStreamingFetchSize())
+ .bind("dataSource", dataSource)
+ .bind("start", interval.getStart().toString())
+ .bind("end", interval.getEnd().toString())
+ .bind("maxUpdatedTime", maxUpdatedTime.toString())
+ .map((index, r, ctx) -> mapToSegment(r))
+ .list()
+ );
+
+ return
segments.stream().filter(Objects::nonNull).collect(Collectors.toList());
+ }
+
/**
* Retrieve the used segment for a given id if it exists in the metadata
store and null otherwise
*/
@@ -1608,6 +1709,20 @@ public class SqlSegmentsMetadataQuery
}).iterator();
}
+ @Nullable
+ private DataSegment mapToSegment(ResultSet resultSet)
+ {
+ String segmentId = "";
+ try {
+ segmentId = resultSet.getString("id");
+ return JacksonUtils.readValue(jsonMapper, resultSet.getBytes("payload"),
DataSegment.class);
+ }
+ catch (Throwable t) {
+ log.error(t, "Could not read segment with ID[%s]", segmentId);
+ return null;
+ }
+ }
+
private UnmodifiableIterator<DataSegment>
filterDataSegmentIteratorByInterval(
ResultIterator<DataSegment> resultIterator,
final Collection<Interval> intervals,
diff --git
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
b/server/src/main/java/org/apache/druid/metadata/UnusedSegmentKillerConfig.java
similarity index 52%
copy from
server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
copy to
server/src/main/java/org/apache/druid/metadata/UnusedSegmentKillerConfig.java
index f53f3108221..082b08251a8 100644
---
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
+++
b/server/src/main/java/org/apache/druid/metadata/UnusedSegmentKillerConfig.java
@@ -22,43 +22,42 @@ package org.apache.druid.metadata;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.config.Configs;
-import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.joda.time.Period;
+import javax.annotation.Nullable;
+
/**
- * Config that dictates polling and caching of segment metadata on leader
- * Coordinator or Overlord services.
+ * Config for {@code UnusedSegmentKiller}. This is used only by the Overlord.
+ * Enabling this config on the Coordinator or other services has no effect.
*/
-public class SegmentsMetadataManagerConfig
+public class UnusedSegmentKillerConfig
{
- public static final String CONFIG_PREFIX = "druid.manager.segments";
-
- @JsonProperty
- private final Period pollDuration;
+ @JsonProperty("enabled")
+ private final boolean enabled;
- @JsonProperty
- private final SegmentMetadataCache.UsageMode useIncrementalCache;
+ @JsonProperty("bufferPeriod")
+ private final Period bufferPeriod;
@JsonCreator
- public SegmentsMetadataManagerConfig(
- @JsonProperty("pollDuration") Period pollDuration,
- @JsonProperty("useIncrementalCache") SegmentMetadataCache.UsageMode
useIncrementalCache
+ public UnusedSegmentKillerConfig(
+ @JsonProperty("enabled") @Nullable Boolean enabled,
+ @JsonProperty("bufferPeriod") @Nullable Period bufferPeriod
)
{
- this.pollDuration = Configs.valueOrDefault(pollDuration,
Period.minutes(1));
- this.useIncrementalCache = Configs.valueOrDefault(useIncrementalCache,
SegmentMetadataCache.UsageMode.NEVER);
+ this.enabled = Configs.valueOrDefault(enabled, false);
+ this.bufferPeriod = Configs.valueOrDefault(bufferPeriod, Period.days(30));
}
/**
- * Usage mode of the incremental cache.
+ * Period for which segments are retained even after being marked as unused.
*/
- public SegmentMetadataCache.UsageMode getCacheUsageMode()
+ public Period getBufferPeriod()
{
- return useIncrementalCache;
+ return bufferPeriod;
}
- public Period getPollDuration()
+ public boolean isEnabled()
{
- return pollDuration;
+ return enabled;
}
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java
b/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java
index 3d19d7c7365..cc949a5dd6e 100644
---
a/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java
+++
b/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java
@@ -22,6 +22,7 @@ package org.apache.druid.metadata.segment;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.error.DruidException;
import org.apache.druid.metadata.PendingSegmentRecord;
+import org.apache.druid.metadata.SqlSegmentsMetadataQuery;
import org.apache.druid.metadata.segment.cache.DatasourceSegmentCache;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.http.DataSegmentPlus;
@@ -118,6 +119,12 @@ class CachedSegmentMetadataTransaction implements
SegmentMetadataTransaction
return delegate.getHandle();
}
+ @Override
+ public SqlSegmentsMetadataQuery noCacheSql()
+ {
+ return delegate.noCacheSql();
+ }
+
@Override
public void setRollbackOnly()
{
diff --git
a/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
b/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
index cb02f7c8f38..924a8c3c54b 100644
---
a/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
+++
b/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
@@ -19,6 +19,7 @@
package org.apache.druid.metadata.segment;
+import org.apache.druid.metadata.SqlSegmentsMetadataQuery;
import org.skife.jdbi.v2.Handle;
import java.io.Closeable;
@@ -36,6 +37,11 @@ public interface SegmentMetadataReadTransaction
*/
Handle getHandle();
+ /**
+ * @return SQL tool to read or update the metadata store directly.
+ */
+ SqlSegmentsMetadataQuery noCacheSql();
+
/**
* Completes the transaction by either committing it or rolling it back.
*/
diff --git
a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransaction.java
b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransaction.java
index 116832dfe31..5c9938f8c5a 100644
---
a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransaction.java
+++
b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransaction.java
@@ -96,6 +96,12 @@ class SqlSegmentMetadataTransaction implements
SegmentMetadataTransaction
return handle;
}
+ @Override
+ public SqlSegmentsMetadataQuery noCacheSql()
+ {
+ return query;
+ }
+
@Override
public void setRollbackOnly()
{
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index 64d81a066f9..26b34a36845 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -67,10 +67,10 @@ import java.util.concurrent.ConcurrentHashMap;
* The datasources to be killed during each cycle are selected from {@link
#datasourceCircularKillList}. This state is
* refreshed in a run if the set of datasources to be killed changes.
Consecutive duplicate datasources are avoided
* across runs, provided there are other datasources to be killed.
- * </p>
- * <p>
- * See {@link org.apache.druid.indexing.common.task.KillUnusedSegmentsTask}.
- * </p>
+ *
+ * @see org.apache.druid.indexing.common.task.KillUnusedSegmentsTask for
details
+ * of the actual kill task and {@code UnusedSegmentKiller} to run embedded kill
+ * tasks on the Overlord.
*/
public class KillUnusedSegments implements CoordinatorDuty
{
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
index 48656aa93e9..72ee7a9a169 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
@@ -101,7 +101,7 @@ public class
IndexerSQLMetadataStorageCoordinatorReadOnlyTest extends IndexerSql
cachePollExecutor = new BlockingExecutorService("test-cache-poll-exec");
segmentMetadataCache = new HeapMemorySegmentMetadataCache(
mapper,
- () -> new SegmentsMetadataManagerConfig(null, cacheMode),
+ () -> new SegmentsMetadataManagerConfig(null, cacheMode, null),
derbyConnectorRule.metadataTablesConfigSupplier(),
new NoopSegmentSchemaCache(),
derbyConnector,
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 9a1b7ce63ed..db5c23b8d30 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -151,7 +151,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
segmentMetadataCache = new HeapMemorySegmentMetadataCache(
mapper,
- () -> new SegmentsMetadataManagerConfig(null, cacheMode),
+ () -> new SegmentsMetadataManagerConfig(null, cacheMode, null),
derbyConnectorRule.metadataTablesConfigSupplier(),
new NoopSegmentSchemaCache(),
derbyConnector,
@@ -1831,6 +1831,99 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertTrue(SEGMENTS.containsAll(retreivedUnusedSegments));
}
+ @Test
+ public void testRetrieveUnusedSegmentsWithExactInterval()
+ {
+ final String dataSource = defaultSegment.getDataSource();
+ coordinator.commitSegments(Set.of(defaultSegment, defaultSegment2,
defaultSegment3), null);
+
+ final DateTime now = DateTimes.nowUtc();
+ markAllSegmentsUnused(Set.of(defaultSegment, defaultSegment2,
defaultSegment3), now.minusHours(1));
+
+ // Verify that query for overlapping interval does not return the segments
+ Assert.assertTrue(
+ coordinator.retrieveUnusedSegmentsWithExactInterval(
+ dataSource,
+ Intervals.ETERNITY,
+ now,
+ 10
+ ).isEmpty()
+ );
+
+ // Verify that query for exact interval returns the segments
+ Assert.assertEquals(
+ List.of(defaultSegment3),
+ coordinator.retrieveUnusedSegmentsWithExactInterval(
+ dataSource,
+ defaultSegment3.getInterval(),
+ now,
+ 10
+ )
+ );
+
+ Assert.assertEquals(defaultSegment.getInterval(),
defaultSegment2.getInterval());
+ Assert.assertEquals(
+ Set.of(defaultSegment, defaultSegment2),
+ Set.copyOf(
+ coordinator.retrieveUnusedSegmentsWithExactInterval(
+ dataSource,
+ defaultSegment.getInterval(),
+ now,
+ 10
+ )
+ )
+ );
+
+ // Verify that query with limit 1 returns only 1 result
+ Assert.assertEquals(
+ 1,
+ coordinator.retrieveUnusedSegmentsWithExactInterval(
+ dataSource,
+ defaultSegment.getInterval(),
+ now,
+ 1
+ ).size()
+ );
+ }
+
+ @Test
+ public void testRetrieveUnusedSegmentIntervals()
+ {
+ final String dataSource = defaultSegment.getDataSource();
+ coordinator.commitSegments(Set.of(defaultSegment, defaultSegment3), null);
+
+ Assert.assertTrue(coordinator.retrieveUnusedSegmentIntervals(dataSource,
100).isEmpty());
+
+ markAllSegmentsUnused(Set.of(defaultSegment),
DateTimes.nowUtc().minusHours(1));
+ Assert.assertEquals(
+ List.of(defaultSegment.getInterval()),
+ coordinator.retrieveUnusedSegmentIntervals(dataSource, 100)
+ );
+
+ markAllSegmentsUnused(Set.of(defaultSegment3),
DateTimes.nowUtc().minusHours(1));
+ Assert.assertEquals(
+ Set.of(defaultSegment.getInterval(), defaultSegment3.getInterval()),
+ Set.copyOf(coordinator.retrieveUnusedSegmentIntervals(dataSource, 100))
+ );
+
+ // Verify retrieve with limit 1 returns only 1 interval
+ Assert.assertEquals(
+ 1,
+ coordinator.retrieveUnusedSegmentIntervals(dataSource, 1).size()
+ );
+ }
+
+ @Test
+ public void testRetrieveAllDatasourceNames()
+ {
+ coordinator.commitSegments(Set.of(defaultSegment), null);
+ coordinator.commitSegments(Set.of(hugeTimeRangeSegment1), null);
+ Assert.assertEquals(
+ Set.of("fooDataSource", "hugeTimeRangeDataSource"),
+ coordinator.retrieveAllDatasourceNames()
+ );
+ }
+
@Test
public void testUsedOverlapLow()
{
@@ -3742,7 +3835,11 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
SegmentId highestUnusedId =
transactionFactory.inReadWriteDatasourceTransaction(
TestDataSource.WIKI,
- transaction ->
transaction.findHighestUnusedSegmentId(Intervals.of("2024/2025"), "v1")
+ transaction -> transaction.noCacheSql().retrieveHighestUnusedSegmentId(
+ TestDataSource.WIKI,
+ Intervals.of("2024/2025"),
+ "v1"
+ )
);
Assert.assertEquals(
unusedSegmentForExactIntervalAndVersion.getId(),
diff --git
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java
index fb310430ac3..44812c3acbf 100644
---
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java
@@ -46,7 +46,7 @@ public class SqlSegmentsMetadataManagerProviderTest
public void testLifecycleStartCreatesSegmentTables() throws Exception
{
final TestDerbyConnector connector = derbyConnectorRule.getConnector();
- final SegmentsMetadataManagerConfig config = new
SegmentsMetadataManagerConfig(null, null);
+ final SegmentsMetadataManagerConfig config = new
SegmentsMetadataManagerConfig(null, null, null);
final Lifecycle lifecycle = new Lifecycle();
final SegmentSchemaCache segmentSchemaCache = new SegmentSchemaCache();
SqlSegmentsMetadataManagerProvider provider = new
SqlSegmentsMetadataManagerProvider(
diff --git
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java
index b1d8335a227..8e7e6a7fc5b 100644
---
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java
@@ -109,7 +109,7 @@ public class SqlSegmentsMetadataManagerSchemaPollTest
extends SqlSegmentsMetadat
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
= CentralizedDatasourceSchemaConfig.enabled(true);
- config = new SegmentsMetadataManagerConfig(Period.seconds(3), null);
+ config = new SegmentsMetadataManagerConfig(Period.seconds(3), null, null);
sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager(
jsonMapper,
Suppliers.ofInstance(config),
@@ -193,7 +193,7 @@ public class SqlSegmentsMetadataManagerSchemaPollTest
extends SqlSegmentsMetadat
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
= CentralizedDatasourceSchemaConfig.enabled(true);
- config = new SegmentsMetadataManagerConfig(Period.seconds(3), null);
+ config = new SegmentsMetadataManagerConfig(Period.seconds(3), null, null);
sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager(
jsonMapper,
Suppliers.ofInstance(config),
diff --git
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
index 987d93dcd27..9e1ba861e1e 100644
---
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
@@ -384,7 +384,8 @@ public class SqlSegmentsMetadataManagerTest extends
SqlSegmentsMetadataManagerTe
final Interval theInterval =
Intervals.of("2012-03-15T00:00:00.000/2012-03-20T00:00:00.000");
// Re-create SqlSegmentsMetadataManager with a higher poll duration
- final SegmentsMetadataManagerConfig config = new
SegmentsMetadataManagerConfig(Period.seconds(1), null);
+ final SegmentsMetadataManagerConfig config =
+ new SegmentsMetadataManagerConfig(Period.seconds(1), null, null);
sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager(
jsonMapper,
Suppliers.ofInstance(config),
diff --git
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java
index c35488c62eb..bedc41f11e8 100644
---
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java
+++
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java
@@ -72,7 +72,7 @@ public class SqlSegmentsMetadataManagerTestBase
protected void setUp(TestDerbyConnector.DerbyConnectorRule
derbyConnectorRule) throws Exception
{
- config = new SegmentsMetadataManagerConfig(Period.seconds(3), null);
+ config = new SegmentsMetadataManagerConfig(Period.seconds(3), null, null);
connector = derbyConnectorRule.getConnector();
storageConfig = derbyConnectorRule.metadataTablesConfigSupplier().get();
diff --git
a/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
b/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
index e5554b931ee..08098decdab 100644
---
a/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
+++
b/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
@@ -88,7 +88,7 @@ public class SqlSegmentsMetadataManagerV2Test extends
SqlSegmentsMetadataManager
segmentMetadataCacheExec = new BlockingExecutorService("test");
SegmentMetadataCache segmentMetadataCache = new
HeapMemorySegmentMetadataCache(
jsonMapper,
- Suppliers.ofInstance(new
SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode)),
+ Suppliers.ofInstance(new
SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode, null)),
Suppliers.ofInstance(storageConfig),
useSchemaCache ? new SegmentSchemaCache() : new
NoopSegmentSchemaCache(),
connector,
diff --git
a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
index 2a5940b0495..fab64152ec8 100644
---
a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
@@ -117,7 +117,7 @@ public class HeapMemorySegmentMetadataCacheTest
throw new ISE("Test target has already been initialized with
caching[%s]", cache.isEnabled());
}
final SegmentsMetadataManagerConfig metadataManagerConfig
- = new SegmentsMetadataManagerConfig(null, cacheMode);
+ = new SegmentsMetadataManagerConfig(null, cacheMode, null);
schemaCache = useSchemaCache ? new SegmentSchemaCache() : new
NoopSegmentSchemaCache();
cache = new HeapMemorySegmentMetadataCache(
TestHelper.JSON_MAPPER,
diff --git
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
index d1b4cdfb7a6..0dea1be5d1a 100644
---
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
+++
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
@@ -132,7 +132,7 @@ public class CoordinatorSegmentMetadataCacheTest extends
CoordinatorSegmentMetad
Mockito.when(segmentsMetadataManager.getRecentDataSourcesSnapshot())
.thenReturn(DataSourcesSnapshot.fromUsedSegments(List.of()));
SegmentsMetadataManagerConfig metadataManagerConfig =
- new SegmentsMetadataManagerConfig(Period.millis(10), null);
+ new SegmentsMetadataManagerConfig(Period.millis(10), null, null);
segmentsMetadataManagerConfigSupplier =
Suppliers.ofInstance(metadataManagerConfig);
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java
index d84cbcff6ef..d526ccfaf6f 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java
@@ -23,16 +23,21 @@ import org.apache.druid.discovery.DruidLeaderSelector;
import javax.annotation.Nullable;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
public class TestDruidLeaderSelector implements DruidLeaderSelector
{
+ private final AtomicInteger localTerm = new AtomicInteger(0);
private final AtomicBoolean isLeader = new AtomicBoolean(false);
private volatile Listener listener;
public void becomeLeader()
{
- if (isLeader.compareAndSet(false, true) && listener != null) {
- listener.becomeLeader();
+ if (isLeader.compareAndSet(false, true)) {
+ if (listener != null) {
+ listener.becomeLeader();
+ }
+ localTerm.incrementAndGet();
}
}
@@ -59,7 +64,7 @@ public class TestDruidLeaderSelector implements
DruidLeaderSelector
@Override
public int localTerm()
{
- return 0;
+ return localTerm.get();
}
@Override
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 95fa654674d..ef4901c68c3 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -93,6 +93,7 @@ import
org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.duty.OverlordDuty;
import org.apache.druid.indexing.overlord.duty.TaskLogAutoCleaner;
import org.apache.druid.indexing.overlord.duty.TaskLogAutoCleanerConfig;
+import org.apache.druid.indexing.overlord.duty.UnusedSegmentsKiller;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerResource;
import org.apache.druid.indexing.overlord.http.OverlordCompactionResource;
@@ -249,6 +250,8 @@ public class CliOverlord extends ServerRunnable
binder.bind(ShuffleClient.class).toProvider(Providers.of(null));
binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(new
NoopChatHandlerProvider()));
+ CliPeon.bindDataSegmentKiller(binder);
+
PolyBind.createChoice(
binder,
"druid.indexer.task.rowIngestionMeters.type",
@@ -445,9 +448,9 @@ public class CliOverlord extends ServerRunnable
private void configureOverlordHelpers(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.indexer.logs.kill",
TaskLogAutoCleanerConfig.class);
- Multibinder.newSetBinder(binder, OverlordDuty.class)
- .addBinding()
- .to(TaskLogAutoCleaner.class);
+ final Multibinder<OverlordDuty> dutyBinder =
Multibinder.newSetBinder(binder, OverlordDuty.class);
+ dutyBinder.addBinding().to(TaskLogAutoCleaner.class);
+
dutyBinder.addBinding().to(UnusedSegmentsKiller.class).in(LazySingleton.class);
}
},
new IndexingServiceInputSourceModule(),
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index e9cbab108d3..9cb939f9b89 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -468,14 +468,19 @@ public class CliPeon extends GuiceRunnable
static void bindPeonDataSegmentHandlers(Binder binder)
{
// Build it to make it bind even if nothing binds to it.
- Binders.dataSegmentKillerBinder(binder);
-
binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class);
+ bindDataSegmentKiller(binder);
Binders.dataSegmentMoverBinder(binder);
binder.bind(DataSegmentMover.class).to(OmniDataSegmentMover.class).in(LazySingleton.class);
Binders.dataSegmentArchiverBinder(binder);
binder.bind(DataSegmentArchiver.class).to(OmniDataSegmentArchiver.class).in(LazySingleton.class);
}
+ static void bindDataSegmentKiller(Binder binder)
+ {
+ Binders.dataSegmentKillerBinder(binder);
+
binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class);
+ }
+
private static void configureTaskActionClient(Binder binder)
{
binder.bind(TaskActionClientFactory.class)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]