This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3be94a09d81 Add embedded kill tasks that run on the Overlord (#18028)
3be94a09d81 is described below

commit 3be94a09d81d98bf5029c18832636e533cdbbb90
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed Jun 4 00:16:56 2025 +0530

    Add embedded kill tasks that run on the Overlord (#18028)
    
    Description
    -----------
    This patch adds an embedded mode of running kill tasks on the Overlord 
itself.
    
    These embedded tasks
    - kill unused segments as soon as they become eligible for kill.
    - run on the Overlord and do not take up task slots.
    - finish faster as a separate task process is not involved.
    - kill a small number of segments per task, to ensure that locks on an 
interval are not held for too long.
    - skip locked intervals to avoid head-of-line blocking in kill tasks.
    - do not require any configuration
    - can keep up with a large number of unused segments in the cluster.
    
    Changes
    ---------
    - Add `UnusedSegmentKiller`: `OverlordDuty` that launches embedded kill 
tasks
    - Add `UnusedSegmentKillerConfig` with the following fields:
      - `enabled`: Turns on segment killer on the Overlord
      - `bufferPeriod`: Period for which segments are retained even after being 
marked as unused.
    - Add `EmbeddedKillTask`: extends `KillUnusedSegmentsTask` to modify some 
behaviour
    - Add `KillTaskToolbox`: simplified version of `TaskToolbox` to run 
embedded kill tasks
    - Make minor changes to `KillUnusedSegmentsTask` to modify some behaviour 
for embedded tasks
    - Add methods to retrieve unused segments and intervals to 
`IndexerMetadataStorageCoordinator`
    and `SqlSegmentsMetadataQuery`
    - Add short-circuit to `TaskLockbox.remove` and make it idempotent
---
 .../druid/storage/s3/S3DataSegmentKiller.java      |   2 +-
 indexing-service/pom.xml                           |   5 +
 .../indexing/common/actions/SegmentNukeAction.java |  40 +-
 .../common/task/KillUnusedSegmentsTask.java        |  73 +++-
 .../druid/indexing/common/task/TaskMetrics.java    |  34 +-
 .../apache/druid/indexing/common/task/Tasks.java   |   6 +
 .../druid/indexing/overlord/TaskLockbox.java       |  15 +-
 .../indexing/overlord/duty/KillTaskToolbox.java    |  97 +++++
 .../overlord/duty/OverlordDutyExecutor.java        |  11 +-
 .../overlord/duty/UnusedSegmentsKiller.java        | 477 +++++++++++++++++++++
 .../indexing/common/actions/TaskActionTestKit.java |  37 +-
 .../indexing/common/task/IngestionTestBase.java    |   4 +-
 .../overlord/duty/OverlordDutyExecutorTest.java    |   4 +-
 .../overlord/duty/UnusedSegmentsKillerTest.java    | 462 ++++++++++++++++++++
 .../TestIndexerMetadataStorageCoordinator.java     |  32 +-
 .../druid/indexer/report/KillTaskReport.java       |   2 +-
 .../druid/segment/loading/SegmentKillResult.java   |  40 +-
 .../druid/java/util/metrics/MetricsVerifier.java   |  36 +-
 .../IndexerMetadataStorageCoordinator.java         |  36 +-
 .../IndexerSQLMetadataStorageCoordinator.java      |  41 +-
 .../metadata/SegmentsMetadataManagerConfig.java    |  23 +-
 .../druid/metadata/SqlSegmentsMetadataQuery.java   | 119 ++++-
 ...rConfig.java => UnusedSegmentKillerConfig.java} |  39 +-
 .../segment/CachedSegmentMetadataTransaction.java  |   7 +
 .../segment/SegmentMetadataReadTransaction.java    |   6 +
 .../segment/SqlSegmentMetadataTransaction.java     |   6 +
 .../coordinator/duty/KillUnusedSegments.java       |   8 +-
 ...rSQLMetadataStorageCoordinatorReadOnlyTest.java |   2 +-
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 101 ++++-
 .../SqlSegmentsMetadataManagerProviderTest.java    |   2 +-
 .../SqlSegmentsMetadataManagerSchemaPollTest.java  |   4 +-
 .../metadata/SqlSegmentsMetadataManagerTest.java   |   3 +-
 .../SqlSegmentsMetadataManagerTestBase.java        |   2 +-
 .../segment/SqlSegmentsMetadataManagerV2Test.java  |   2 +-
 .../cache/HeapMemorySegmentMetadataCacheTest.java  |   2 +-
 .../CoordinatorSegmentMetadataCacheTest.java       |   2 +-
 .../simulate/TestDruidLeaderSelector.java          |  11 +-
 .../java/org/apache/druid/cli/CliOverlord.java     |   9 +-
 .../main/java/org/apache/druid/cli/CliPeon.java    |   9 +-
 39 files changed, 1649 insertions(+), 162 deletions(-)

diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java
index fbe703a888e..79435869e7b 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java
@@ -149,7 +149,7 @@ public class S3DataSegmentKiller implements 
DataSegmentKiller
       try {
         deleteObjectsRequest.setKeys(chunkOfKeys);
         log.info(
-            "Removing from bucket: [%s] the following index files: [%s] from 
s3!",
+            "Deleting the following segment files from S3 bucket[%s]: [%s]",
             s3Bucket,
             keysToDeleteStrings
         );
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 5c722530acf..da30e431e8f 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -267,6 +267,11 @@
             <artifactId>maven-resolver-api</artifactId>
             <version>1.3.1</version>
         </dependency>
+        <dependency>
+            <groupId>org.jdbi</groupId>
+            <artifactId>jdbi</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <profiles>
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java
index 84445d3968a..6c73f664e3f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java
@@ -27,16 +27,23 @@ import org.apache.druid.indexing.common.task.IndexTaskUtils;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.CriticalAction;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.segment.SegmentUtils;
 import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
 
 import java.util.Set;
 import java.util.stream.Collectors;
 
+/**
+ * Permanently deletes unused segments from the metadata store.
+ */
 public class SegmentNukeAction implements TaskAction<Void>
 {
+  private static final Logger log = new Logger(SegmentNukeAction.class);
+
   private final Set<DataSegment> segments;
 
   @JsonCreator
@@ -65,22 +72,25 @@ public class SegmentNukeAction implements TaskAction<Void>
     TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), 
segments);
 
     try {
-      toolbox.getTaskLockbox().doInCriticalSection(
+      final Set<Interval> intervals = 
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet());
+      int numDeletedSegments = toolbox.getTaskLockbox().doInCriticalSection(
           task,
-          
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
-          CriticalAction.builder()
-                        .onValidLocks(
-                            () -> {
-                              
toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments);
-                              return null;
-                            }
-                        )
-                        .onInvalidLocks(
-                            () -> {
-                              throw new ISE("Some locks for task[%s] are 
already revoked", task.getId());
-                            }
-                        )
-                        .build()
+          intervals,
+          CriticalAction.<Integer>builder().onValidLocks(
+              () -> 
toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments)
+          ).onInvalidLocks(
+              () -> {
+                throw new ISE("Some locks for task[%s] are already revoked", 
task.getId());
+              }
+          ).build()
+      );
+
+      log.info(
+          "Deleted [%d] segments from metadata store out of requested[%d],"
+          + " across [%d] intervals[%s], for task[%s] of datasource[%s].",
+          numDeletedSegments, segments.size(),
+          intervals.size(), intervals,
+          task.getId(), task.getDataSource()
       );
     }
     catch (Exception e) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
index 06082a988d9..fe58c264ca8 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
@@ -212,7 +212,7 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
     int nextBatchSize = computeNextBatchSize(numSegmentsKilled);
     @Nullable Integer numTotalBatches = getNumTotalBatches();
     List<DataSegment> unusedSegments;
-    LOG.info(
+    logInfo(
         "Starting kill for datasource[%s] in interval[%s] and versions[%s] 
with batchSize[%d], up to limit[%d]"
         + " segments before maxUsedStatusLastUpdatedTime[%s] will be 
deleted%s",
         getDataSource(), getInterval(), getVersions(), batchSize, limit, 
maxUsedStatusLastUpdatedTime,
@@ -236,9 +236,7 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
         break;
       }
 
-      unusedSegments = toolbox.getTaskActionClient().submit(
-          new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), 
getVersions(), nextBatchSize, maxUsedStatusLastUpdatedTime)
-      );
+      unusedSegments = fetchNextBatchOfUnusedSegments(toolbox, nextBatchSize);
 
       // Fetch locks each time as a revokal could have occurred in between 
batches
       final NavigableMap<DateTime, List<TaskLock>> taskLockMap
@@ -283,6 +281,7 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
 
       // Nuke Segments
       taskActionClient.submit(new SegmentNukeAction(new 
HashSet<>(unusedSegments)));
+      emitMetric(toolbox.getEmitter(), 
TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, unusedSegments.size());
 
       // Determine segments to be killed
       final List<DataSegment> segmentsToBeKilled
@@ -290,22 +289,27 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
 
       final Set<DataSegment> segmentsNotKilled = new HashSet<>(unusedSegments);
       segmentsToBeKilled.forEach(segmentsNotKilled::remove);
-      LOG.infoSegments(
-          segmentsNotKilled,
-          "Skipping segment kill from deep storage as their load specs are 
referenced by other segments."
-      );
+
+      if (!segmentsNotKilled.isEmpty()) {
+        LOG.warn(
+            "Skipping kill of [%d] segments from deep storage as their load 
specs are used by other segments.",
+            segmentsNotKilled.size()
+        );
+      }
 
       toolbox.getDataSegmentKiller().kill(segmentsToBeKilled);
+      emitMetric(toolbox.getEmitter(), 
TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, segmentsToBeKilled.size());
+
       numBatchesProcessed++;
       numSegmentsKilled += segmentsToBeKilled.size();
 
-      LOG.info("Processed [%d] batches for kill task[%s].", 
numBatchesProcessed, getId());
+      logInfo("Processed [%d] batches for kill task[%s].", 
numBatchesProcessed, getId());
 
       nextBatchSize = computeNextBatchSize(numSegmentsKilled);
     } while (!unusedSegments.isEmpty() && (null == numTotalBatches || 
numBatchesProcessed < numTotalBatches));
 
     final String taskId = getId();
-    LOG.info(
+    logInfo(
         "Finished kill task[%s] for dataSource[%s] and interval[%s]."
         + " Deleted total [%d] unused segments in [%d] batches.",
         taskId, getDataSource(), getInterval(), numSegmentsKilled, 
numBatchesProcessed
@@ -322,9 +326,8 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
   }
 
   @JsonIgnore
-  @VisibleForTesting
   @Nullable
-  Integer getNumTotalBatches()
+  protected Integer getNumTotalBatches()
   {
     return null != limit ? (int) Math.ceil((double) limit / batchSize) : null;
   }
@@ -336,6 +339,31 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
     return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) : 
batchSize;
   }
 
+  /**
+   * Fetches the next batch of unused segments that are eligible for kill.
+   */
+  protected List<DataSegment> fetchNextBatchOfUnusedSegments(TaskToolbox 
toolbox, int nextBatchSize) throws IOException
+  {
+    return toolbox.getTaskActionClient().submit(
+        new RetrieveUnusedSegmentsAction(
+            getDataSource(),
+            getInterval(),
+            getVersions(),
+            nextBatchSize,
+            maxUsedStatusLastUpdatedTime
+        )
+    );
+  }
+
+  /**
+   * Logs the given info message. Exposed here to allow embedded kill tasks to
+   * suppress info logs.
+   */
+  protected void logInfo(String message, Object... args)
+  {
+    LOG.info(message, args);
+  }
+
   private NavigableMap<DateTime, List<TaskLock>> 
getNonRevokedTaskLockMap(TaskActionClient client) throws IOException
   {
     final NavigableMap<DateTime, List<TaskLock>> taskLockMap = new TreeMap<>();
@@ -385,6 +413,10 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
         response.getUpgradedToSegmentIds().forEach((parent, children) -> {
           if (!CollectionUtils.isNullOrEmpty(children)) {
             // Do not kill segment if its parent or any of its siblings still 
exist in metadata store
+            LOG.info(
+                "Skipping kill of segments[%s] as its load spec is also used 
by segment IDs[%s].",
+                parentIdToUnusedSegments.get(parent), children
+            );
             parentIdToUnusedSegments.remove(parent);
           }
         });
@@ -402,10 +434,25 @@ public class KillUnusedSegmentsTask extends 
AbstractFixedIntervalTask
     return parentIdToUnusedSegments.values()
                                    .stream()
                                    .flatMap(Set::stream)
-                                   .filter(segment -> 
!usedSegmentLoadSpecs.contains(segment.getLoadSpec()))
+                                   .filter(segment -> 
!isSegmentLoadSpecPresentIn(segment, usedSegmentLoadSpecs))
                                    .collect(Collectors.toList());
   }
 
+  /**
+   * @return true if the load spec of the segment is present in the given set 
of
+   * used load specs.
+   */
+  private boolean isSegmentLoadSpecPresentIn(
+      DataSegment segment,
+      Set<Map<String, Object>> usedSegmentLoadSpecs
+  )
+  {
+    boolean isPresent = usedSegmentLoadSpecs.contains(segment.getLoadSpec());
+    if (isPresent) {
+      LOG.info("Skipping kill of segment[%s] as its load spec is also used by 
other segments.", segment);
+    }
+    return isPresent;
+  }
 
   @Override
   public LookupLoadingSpec getLookupLoadingSpec()
diff --git 
a/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskMetrics.java
similarity index 52%
copy from 
server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
copy to 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskMetrics.java
index cb02f7c8f38..86456d12fdc 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskMetrics.java
@@ -17,35 +17,21 @@
  * under the License.
  */
 
-package org.apache.druid.metadata.segment;
-
-import org.skife.jdbi.v2.Handle;
-
-import java.io.Closeable;
+package org.apache.druid.indexing.common.task;
 
 /**
- * Represents a single transaction involving read of segment metadata into
- * the metadata store. A transaction is associated with a single instance of a
- * {@link Handle} and is meant to be short-lived.
+ * Task-related metrics emitted by the Druid cluster.
  */
-public interface SegmentMetadataReadTransaction
-    extends DatasourceSegmentMetadataReader, Closeable
+public class TaskMetrics
 {
-  /**
-   * @return The JDBI handle used in this transaction
-   */
-  Handle getHandle();
-
-  /**
-   * Completes the transaction by either committing it or rolling it back.
-   */
-  @Override
-  void close();
-
-  @FunctionalInterface
-  interface Callback<T>
+  private TaskMetrics()
   {
-    T inTransaction(SegmentMetadataReadTransaction transaction) throws 
Exception;
+    // no instantiation
   }
 
+  public static final String RUN_DURATION = "task/run/time";
+
+  public static final String SEGMENTS_DELETED_FROM_METADATA_STORE = 
"segment/killed/metadataStore/count";
+  public static final String SEGMENTS_DELETED_FROM_DEEPSTORE = 
"segment/killed/deepStorage/count";
+  public static final String FILES_DELETED_FROM_DEEPSTORE = 
"segment/killed/deepStorageFile/count";
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
index 341ac0c9326..8e62113f0c7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
@@ -32,6 +32,12 @@ public class Tasks
   public static final int DEFAULT_BATCH_INDEX_TASK_PRIORITY = 50;
   public static final int DEFAULT_MERGE_TASK_PRIORITY = 25;
 
+  /**
+   * Priority of embedded kill tasks. Kept lower than batch and realtime tasks
+   * to allow them to preempt embbedded kill tasks.
+   */
+  public static final int DEFAULT_EMBEDDED_KILL_TASK_PRIORITY = 25;
+
   static {
     Verify.verify(DEFAULT_MERGE_TASK_PRIORITY == 
DataSourceCompactionConfig.DEFAULT_COMPACTION_TASK_PRIORITY);
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 9815bb14a71..99168143f84 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -1232,16 +1232,17 @@ public class TaskLockbox
   {
     giant.lock();
     try {
-      try {
-        log.info("Removing task[%s] from activeTasks", task.getId());
-        cleanupUpgradeAndPendingSegments(task);
-        unlockAll(task);
-      }
-      finally {
-        activeTasks.remove(task.getId());
+      if (!activeTasks.contains(task.getId())) {
+        return;
       }
+      log.info("Removing task[%s] from activeTasks", task.getId());
+      cleanupUpgradeAndPendingSegments(task);
+      unlockAll(task);
     }
     finally {
+      if (task != null) {
+        activeTasks.remove(task.getId());
+      }
       giant.unlock();
     }
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/KillTaskToolbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/KillTaskToolbox.java
new file mode 100644
index 00000000000..dd2afe6017d
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/KillTaskToolbox.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.loading.DataSegmentKiller;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
+
+import java.io.OutputStream;
+
+/**
+ * Wrapper over {@link TaskToolbox} used for embedded kill tasks launched by
+ * {@link UnusedSegmentsKiller}.
+ */
+public class KillTaskToolbox
+{
+  /**
+   * Creates a {@link TaskToolbox} with just enough dependencies to make the
+   * embedded kill tasks work in {@link UnusedSegmentsKiller}.
+   */
+  static TaskToolbox create(
+      TaskActionClient taskActionClient,
+      DataSegmentKiller dataSegmentKiller,
+      ServiceEmitter emitter
+  )
+  {
+    final ObjectMapper mapper = DefaultObjectMapper.INSTANCE;
+    final IndexIO indexIO = new IndexIO(mapper, ColumnConfig.DEFAULT);
+
+    return new TaskToolbox.Builder()
+        .taskActionClient(taskActionClient)
+        .dataSegmentKiller(dataSegmentKiller)
+        .taskReportFileWriter(NoopReportWriter.INSTANCE)
+        .indexIO(indexIO)
+        .indexMergerV9(new IndexMergerV9(mapper, indexIO, 
TmpFileSegmentWriteOutMediumFactory.instance(), false))
+        .emitter(emitter)
+        .build();
+  }
+
+  /**
+   * Noop report writer.
+   */
+  private static class NoopReportWriter extends SingleFileTaskReportFileWriter
+  {
+    private static final NoopReportWriter INSTANCE = new NoopReportWriter();
+
+    private NoopReportWriter()
+    {
+      super(null);
+    }
+
+    @Override
+    public void setObjectMapper(ObjectMapper objectMapper)
+    {
+      // Do nothing
+    }
+
+    @Override
+    public void write(String taskId, TaskReport.ReportMap reports)
+    {
+      // Do nothing, metrics are emitted by the KillUnusedSegmentsTask itself
+    }
+
+    @Override
+    public OutputStream openReportOutputStream(String taskId)
+    {
+      throw DruidException.defensive("Cannot write reports using this 
reporter");
+    }
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java
index 9ba4e90f0f6..689d0876894 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java
@@ -97,7 +97,12 @@ public class OverlordDutyExecutor
     initExecutor();
 
     final DutySchedule schedule = duty.getSchedule();
-    final String dutyName = duty.getClass().getName();
+    final String dutyName = duty.getClass().getSimpleName();
+
+    if (schedule == null || schedule.getPeriodMillis() <= 0) {
+      log.info("Not scheduling overlord duty[%s] as it has no period 
specified.", dutyName);
+      return;
+    }
 
     ScheduledExecutors.scheduleWithFixedDelay(
         exec,
@@ -108,13 +113,13 @@ public class OverlordDutyExecutor
             duty.run();
           }
           catch (Exception e) {
-            log.error(e, "Error while running duty [%s]", dutyName);
+            log.error(e, "Error while running duty[%s]", dutyName);
           }
         }
     );
 
     log.info(
-        "Scheduled overlord duty [%s] with initial delay [%d], period [%d].",
+        "Scheduled overlord duty[%s] with initial delay[%d], period[%d].",
         dutyName, schedule.getInitialDelayMillis(), schedule.getPeriodMillis()
     );
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java
new file mode 100644
index 00000000000..bce95ffa258
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import com.google.common.collect.Ordering;
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
+import org.apache.druid.indexing.common.task.TaskMetrics;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.UnusedSegmentKillerConfig;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.loading.DataSegmentKiller;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * {@link OverlordDuty} to delete unused segments from metadata store and the
+ * deep storage. Launches {@link EmbeddedKillTask}s to clean unused segments
+ * of a single datasource-interval.
+ *
+ * @see SegmentsMetadataManagerConfig to enable the cleanup
+ * @see org.apache.druid.server.coordinator.duty.KillUnusedSegments for legacy
+ * mode of killing unused segments via Coordinator duties
+ */
+public class UnusedSegmentsKiller implements OverlordDuty
+{
+  private static final EmittingLogger log = new 
EmittingLogger(UnusedSegmentsKiller.class);
+
+  private static final String TASK_ID_PREFIX = "overlord-issued";
+
+  private static final int INITIAL_KILL_QUEUE_SIZE = 1000;
+  private static final int MAX_INTERVALS_TO_KILL_IN_DATASOURCE = 10_000;
+  private static final int MAX_SEGMENTS_TO_KILL_IN_INTERVAL = 1000;
+
+  /**
+   * Period after which the queue is reset even if there are existing jobs in 
queue.
+   */
+  private static final Duration QUEUE_RESET_PERIOD = Duration.standardDays(1);
+
+  /**
+   * Duration for which a kill task is allowed to run.
+   */
+  private static final Duration MAX_TASK_DURATION = 
Duration.standardMinutes(10);
+
+  private final ServiceEmitter emitter;
+  private final TaskLockbox taskLockbox;
+  private final DruidLeaderSelector leaderSelector;
+  private final DataSegmentKiller dataSegmentKiller;
+
+  private final UnusedSegmentKillerConfig killConfig;
+  private final TaskActionClientFactory taskActionClientFactory;
+  private final IndexerMetadataStorageCoordinator storageCoordinator;
+
+  /**
+   * Single-threaded executor to process kill jobs.
+   */
+  private final ScheduledExecutorService exec;
+  private int previousLeaderTerm;
+  private final AtomicReference<DateTime> lastResetTime = new 
AtomicReference<>(null);
+
+  private final AtomicReference<TaskInfo> currentTaskInfo = new 
AtomicReference<>(null);
+
+  /**
+   * Queue of kill candidates. Use a PriorityBlockingQueue to ensure 
thread-safety
+   * since this queue is accessed by both {@link #run()} and {@link 
#startNextJobInKillQueue}.
+   */
+  private final PriorityBlockingQueue<KillCandidate> killQueue;
+
+  @Inject
+  public UnusedSegmentsKiller(
+      SegmentsMetadataManagerConfig config,
+      TaskActionClientFactory taskActionClientFactory,
+      IndexerMetadataStorageCoordinator storageCoordinator,
+      @IndexingService DruidLeaderSelector leaderSelector,
+      ScheduledExecutorFactory executorFactory,
+      DataSegmentKiller dataSegmentKiller,
+      TaskLockbox taskLockbox,
+      ServiceEmitter emitter
+  )
+  {
+    this.emitter = emitter;
+    this.taskLockbox = taskLockbox;
+    this.leaderSelector = leaderSelector;
+    this.dataSegmentKiller = dataSegmentKiller;
+    this.storageCoordinator = storageCoordinator;
+    this.taskActionClientFactory = taskActionClientFactory;
+
+    this.killConfig = config.getKillUnused();
+
+    if (isEnabled()) {
+      this.exec = executorFactory.create(1, "UnusedSegmentsKiller-%s");
+      this.killQueue = new PriorityBlockingQueue<>(
+          INITIAL_KILL_QUEUE_SIZE,
+          Ordering.from(Comparators.intervalsByEndThenStart())
+                  .onResultOf(candidate -> candidate.interval)
+      );
+    } else {
+      this.exec = null;
+      this.killQueue = null;
+    }
+  }
+
+  @Override
+  public boolean isEnabled()
+  {
+    return killConfig.isEnabled();
+  }
+
+  /**
+   * Ensures that things are moving along and the kill queue is not stuck.
+   * Updates the state if leadership changes or if the queue needs to be reset.
+   */
+  @Override
+  public void run()
+  {
+    if (!isEnabled()) {
+      return;
+    }
+
+    updateStateIfNewLeader();
+    if (shouldRebuildKillQueue()) {
+      // Clear the killQueue to stop further processing of already queued jobs
+      killQueue.clear();
+      exec.submit(() -> {
+        rebuildKillQueue();
+        startNextJobInKillQueue();
+      });
+    }
+
+    // Cancel the current task if it has been running for too long
+    final TaskInfo taskInfo = currentTaskInfo.get();
+    if (taskInfo != null && !taskInfo.future.isDone()
+        && taskInfo.sinceTaskStarted.hasElapsed(MAX_TASK_DURATION)) {
+      log.warn(
+          "Cancelling kill task[%s] as it has been running for [%,d] millis.",
+          taskInfo.taskId, taskInfo.sinceTaskStarted.millisElapsed()
+      );
+      taskInfo.future.cancel(true);
+    }
+  }
+
+  @Override
+  public DutySchedule getSchedule()
+  {
+    if (isEnabled()) {
+      // Check every hour that the kill queue is being processed normally
+      log.info("Scheduling is enabled to launch embedded kill tasks.");
+      return new DutySchedule(Duration.standardHours(1).getMillis(), 
Duration.standardMinutes(1).getMillis());
+    } else {
+      return new DutySchedule(0, 0);
+    }
+  }
+
+  private void updateStateIfNewLeader()
+  {
+    final int currentLeaderTerm = leaderSelector.localTerm();
+    if (currentLeaderTerm != previousLeaderTerm) {
+      previousLeaderTerm = currentLeaderTerm;
+      killQueue.clear();
+      lastResetTime.set(null);
+    }
+  }
+
+  /**
+   * Returns true if the kill queue is empty or if the queue has not been reset
+   * yet or if {@code (lastResetTime + resetPeriod) < (now + 1)}.
+   */
+  private boolean shouldRebuildKillQueue()
+  {
+    final DateTime now = DateTimes.nowUtc().plus(1);
+
+    return killQueue.isEmpty()
+           || lastResetTime.get() == null
+           || lastResetTime.get().plus(QUEUE_RESET_PERIOD).isBefore(now);
+  }
+
+  /**
+   * Clears the kill queue and adds fresh jobs.
+   * This method need not handle race conditions as it is always run on
+   * {@link #exec} which is single-threaded.
+   */
+  private void rebuildKillQueue()
+  {
+    final Stopwatch resetDuration = Stopwatch.createStarted();
+    try {
+      killQueue.clear();
+      if (!leaderSelector.isLeader()) {
+        log.info("Not rebuilding kill queue as we are not leader anymore.");
+        return;
+      }
+
+      final Set<String> dataSources = 
storageCoordinator.retrieveAllDatasourceNames();
+
+      final Map<String, Integer> dataSourceToIntervalCounts = new HashMap<>();
+      for (String dataSource : dataSources) {
+        storageCoordinator.retrieveUnusedSegmentIntervals(dataSource, 
MAX_INTERVALS_TO_KILL_IN_DATASOURCE).forEach(
+            interval -> {
+              dataSourceToIntervalCounts.merge(dataSource, 1, Integer::sum);
+              killQueue.offer(new KillCandidate(dataSource, interval));
+            }
+        );
+      }
+
+      lastResetTime.set(DateTimes.nowUtc());
+      log.info(
+          "Queued [%d] kill jobs for [%d] datasources in [%d] millis.",
+          killQueue.size(), dataSources.size(), resetDuration.millisElapsed()
+      );
+      dataSourceToIntervalCounts.forEach(
+          (dataSource, intervalCount) -> emitMetric(
+              Metric.UNUSED_SEGMENT_INTERVALS,
+              intervalCount,
+              Map.of(DruidMetrics.DATASOURCE, dataSource)
+          )
+      );
+      emitMetric(Metric.QUEUE_RESET_TIME, resetDuration.millisElapsed(), null);
+    }
+    catch (Throwable t) {
+      log.makeAlert(t, "Error while resetting kill queue.");
+    }
+  }
+
+  /**
+   * Launches an {@link EmbeddedKillTask} on the {@link #exec} for the next
+   * {@link KillCandidate} in the {@link #killQueue}. This method returns
+   * immediately.
+   */
+  private void startNextJobInKillQueue()
+  {
+    if (!isEnabled() || !leaderSelector.isLeader()) {
+      return;
+    }
+
+    if (killQueue.isEmpty()) {
+      // If the last entry has been processed, emit the total processing time 
and exit
+      final DateTime lastQueueResetTime = lastResetTime.get();
+      if (lastQueueResetTime != null) {
+        long processTimeMillis = DateTimes.nowUtc().getMillis() - 
lastQueueResetTime.getMillis();
+        emitMetric(Metric.QUEUE_PROCESS_TIME, processTimeMillis, null);
+      }
+      return;
+    }
+
+    try {
+      final KillCandidate candidate = killQueue.poll();
+      if (candidate == null) {
+        return;
+      }
+
+      final String taskId = IdUtils.newTaskId(
+          TASK_ID_PREFIX,
+          KillUnusedSegmentsTask.TYPE,
+          candidate.dataSource,
+          candidate.interval
+      );
+
+      final Future<?> taskFuture = exec.submit(() -> {
+        runKillTask(candidate, taskId);
+        startNextJobInKillQueue();
+      });
+      currentTaskInfo.set(new TaskInfo(taskId, taskFuture));
+    }
+    catch (Throwable t) {
+      log.makeAlert(t, "Error while processing kill queue.");
+      currentTaskInfo.set(null);
+    }
+  }
+
+  /**
+   * Launches an embedded kill task for the given candidate.
+   */
+  private void runKillTask(KillCandidate candidate, String taskId)
+  {
+    final Stopwatch taskRunTime = Stopwatch.createStarted();
+    final EmbeddedKillTask killTask = new EmbeddedKillTask(
+        taskId,
+        candidate,
+        DateTimes.nowUtc().minus(killConfig.getBufferPeriod())
+    );
+
+    final TaskActionClient taskActionClient = 
taskActionClientFactory.create(killTask);
+    final TaskToolbox taskToolbox = KillTaskToolbox.create(taskActionClient, 
dataSegmentKiller, emitter);
+
+    final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
+    IndexTaskUtils.setTaskDimensions(metricBuilder, killTask);
+
+    try {
+      taskLockbox.add(killTask);
+      final boolean isReady = killTask.isReady(taskActionClient);
+      if (!isReady) {
+        emitter.emit(metricBuilder.setMetric(Metric.SKIPPED_INTERVALS, 1L));
+        return;
+      }
+
+      final TaskStatus status = killTask.runTask(taskToolbox);
+
+      IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status);
+      emitter.emit(metricBuilder.setMetric(TaskMetrics.RUN_DURATION, 
taskRunTime.millisElapsed()));
+    }
+    catch (Throwable t) {
+      log.error(t, "Embedded kill task[%s] failed.", killTask.getId());
+
+      IndexTaskUtils.setTaskStatusDimensions(metricBuilder, 
TaskStatus.failure(taskId, "Unknown error"));
+      emitter.emit(metricBuilder.setMetric(TaskMetrics.RUN_DURATION, 
taskRunTime.millisElapsed()));
+    }
+    finally {
+      cleanupLocksSilently(killTask);
+      emitMetric(Metric.PROCESSED_KILL_JOBS, 1L, 
Map.of(DruidMetrics.DATASOURCE, candidate.dataSource));
+    }
+  }
+
+  private void cleanupLocksSilently(EmbeddedKillTask killTask)
+  {
+    try {
+      taskLockbox.remove(killTask);
+    }
+    catch (Throwable t) {
+      log.error(t, "Error while cleaning up locks for kill task[%s].", 
killTask.getId());
+    }
+  }
+
+  private void emitMetric(String metricName, long value, Map<String, String> 
dimensions)
+  {
+    final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder();
+    if (dimensions != null) {
+      dimensions.forEach(builder::setDimension);
+    }
+    emitter.emit(builder.setMetric(metricName, value));
+  }
+
+  /**
+   * Represents a single candidate interval that contains unused segments.
+   */
+  private static class KillCandidate
+  {
+    private final String dataSource;
+    private final Interval interval;
+
+    private KillCandidate(String dataSource, Interval interval)
+    {
+      this.dataSource = dataSource;
+      this.interval = interval;
+    }
+  }
+
+  /**
+   * Info of the currently running task.
+   */
+  private static class TaskInfo
+  {
+    private final Stopwatch sinceTaskStarted;
+    private final String taskId;
+    private final Future<?> future;
+
+    private TaskInfo(String taskId, Future<?> future)
+    {
+      this.future = future;
+      this.taskId = taskId;
+      this.sinceTaskStarted = Stopwatch.createStarted();
+    }
+  }
+
+  /**
+   * Embedded kill task. Unlike other task types, this task is not persisted 
and
+   * does not run on a worker or indexer. Hence, it doesn't take up any task 
slots.
+   * To ensure that locks are held very briefly over short segment intervals,
+   * this kill task processes:
+   * <ul>
+   * <li>only 1 unused segment interval</li>
+   * <li>only 1 batch of upto 1000 unused segments</li>
+   * </ul>
+   */
+  private class EmbeddedKillTask extends KillUnusedSegmentsTask
+  {
+    private EmbeddedKillTask(
+        String taskId,
+        KillCandidate candidate,
+        DateTime maxUpdatedTimeOfEligibleSegment
+    )
+    {
+      super(
+          taskId,
+          candidate.dataSource,
+          candidate.interval,
+          null,
+          Map.of(Tasks.PRIORITY_KEY, 
Tasks.DEFAULT_EMBEDDED_KILL_TASK_PRIORITY),
+          null,
+          null,
+          maxUpdatedTimeOfEligibleSegment
+      );
+    }
+
+    @Nullable
+    @Override
+    protected Integer getNumTotalBatches()
+    {
+      // Do everything in a single batch so that locks are not held for very 
long
+      return 1;
+    }
+
+    @Override
+    protected List<DataSegment> fetchNextBatchOfUnusedSegments(TaskToolbox 
toolbox, int nextBatchSize)
+    {
+      // Kill only 1000 segments in the batch so that locks are not held for 
very long
+      return storageCoordinator.retrieveUnusedSegmentsWithExactInterval(
+          getDataSource(),
+          getInterval(),
+          getMaxUsedStatusLastUpdatedTime(),
+          MAX_SEGMENTS_TO_KILL_IN_INTERVAL
+      );
+    }
+
+    @Override
+    protected void logInfo(String message, Object... args)
+    {
+      // Reduce the level of embedded task info logs to reduce noise on the 
Overlord
+      log.debug(message, args);
+    }
+  }
+
+  public static class Metric
+  {
+    public static final String QUEUE_RESET_TIME = 
"segment/kill/queueReset/time";
+    public static final String QUEUE_PROCESS_TIME = 
"segment/kill/queueProcess/time";
+    public static final String PROCESSED_KILL_JOBS = 
"segment/kill/jobsProcessed/count";
+
+    public static final String SKIPPED_INTERVALS = 
"segment/kill/skippedIntervals/count";
+    public static final String UNUSED_SEGMENT_INTERVALS = 
"segment/kill/unusedIntervals/count";
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
index 3a63495928a..2ddd0632291 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java
@@ -30,6 +30,8 @@ import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.config.TaskLockConfig;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
 import org.apache.druid.metadata.MetadataStorageConnectorConfig;
 import org.apache.druid.metadata.MetadataStorageTablesConfig;
@@ -44,7 +46,6 @@ import org.apache.druid.segment.metadata.SegmentSchemaManager;
 import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
 import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
 import 
org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
-import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.easymock.EasyMock;
 import org.joda.time.Period;
 import org.junit.rules.ExternalResource;
@@ -55,6 +56,7 @@ public class TaskActionTestKit extends ExternalResource
 
   private TaskStorage taskStorage;
   private TaskLockbox taskLockbox;
+  private StubServiceEmitter emitter;
   private TestDerbyConnector testDerbyConnector;
   private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
   private TaskActionToolbox taskActionToolbox;
@@ -64,6 +66,21 @@ public class TaskActionTestKit extends ExternalResource
   private boolean useSegmentMetadataCache = false;
   private boolean skipSegmentPayloadFetchForAllocation = new 
TaskLockConfig().isBatchAllocationReduceMetadataIO();
 
+  public StubServiceEmitter getServiceEmitter()
+  {
+    return emitter;
+  }
+
+  public TestDerbyConnector getTestDerbyConnector()
+  {
+    return testDerbyConnector;
+  }
+
+  public MetadataStorageTablesConfig getMetadataStorageTablesConfig()
+  {
+    return metadataStorageTablesConfig;
+  }
+
   public TaskLockbox getTaskLockbox()
   {
     return taskLockbox;
@@ -97,6 +114,7 @@ public class TaskActionTestKit extends ExternalResource
   @Override
   public void before()
   {
+    emitter = new StubServiceEmitter();
     taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new 
Period("PT24H")));
     testDerbyConnector = new TestDerbyConnector(
         Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
@@ -109,7 +127,7 @@ public class TaskActionTestKit extends ExternalResource
         testDerbyConnector
     );
 
-    final SqlSegmentMetadataTransactionFactory transactionFactory = 
setupTransactionFactory(objectMapper);
+    final SqlSegmentMetadataTransactionFactory transactionFactory = 
setupTransactionFactory(objectMapper, emitter);
     metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
         transactionFactory,
         objectMapper,
@@ -142,10 +160,10 @@ public class TaskActionTestKit extends ExternalResource
             taskLockbox,
             taskLockConfig,
             metadataStorageCoordinator,
-            NoopServiceEmitter.instance(),
+            emitter,
             ScheduledExecutors::fixed
         ),
-        NoopServiceEmitter.instance(),
+        emitter,
         EasyMock.createMock(SupervisorManager.class),
         objectMapper
     );
@@ -163,7 +181,10 @@ public class TaskActionTestKit extends ExternalResource
     syncSegmentMetadataCache();
   }
 
-  private SqlSegmentMetadataTransactionFactory 
setupTransactionFactory(ObjectMapper objectMapper)
+  private SqlSegmentMetadataTransactionFactory setupTransactionFactory(
+      ObjectMapper objectMapper,
+      ServiceEmitter emitter
+  )
   {
     metadataCachePollExec = new 
BlockingExecutorService("test-cache-poll-exec");
     SegmentMetadataCache.UsageMode cacheMode
@@ -173,12 +194,12 @@ public class TaskActionTestKit extends ExternalResource
 
     segmentMetadataCache = new HeapMemorySegmentMetadataCache(
         objectMapper,
-        Suppliers.ofInstance(new 
SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode)),
+        Suppliers.ofInstance(new 
SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode, null)),
         Suppliers.ofInstance(metadataStorageTablesConfig),
         new NoopSegmentSchemaCache(),
         testDerbyConnector,
         (poolSize, name) -> new WrappingScheduledExecutorService(name, 
metadataCachePollExec, false),
-        NoopServiceEmitter.instance()
+        emitter
     );
 
     final TestDruidLeaderSelector leaderSelector = new 
TestDruidLeaderSelector();
@@ -190,7 +211,7 @@ public class TaskActionTestKit extends ExternalResource
         testDerbyConnector,
         leaderSelector,
         segmentMetadataCache,
-        NoopServiceEmitter.instance()
+        emitter
     )
     {
       @Override
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 848a2bb91ce..9972340ae0c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -181,7 +181,7 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
         segmentMetadataCache,
         segmentSchemaCache,
         derbyConnectorRule.getConnector(),
-        () -> new SegmentsMetadataManagerConfig(null, null),
+        () -> new SegmentsMetadataManagerConfig(null, null, null),
         derbyConnectorRule.metadataTablesConfigSupplier(),
         CentralizedDatasourceSchemaConfig::create,
         NoopServiceEmitter.instance(),
@@ -323,7 +323,7 @@ public abstract class IngestionTestBase extends 
InitializedNullHandlingTest
           : SegmentMetadataCache.UsageMode.NEVER;
     segmentMetadataCache = new HeapMemorySegmentMetadataCache(
         objectMapper,
-        Suppliers.ofInstance(new 
SegmentsMetadataManagerConfig(Period.millis(10), cacheMode)),
+        Suppliers.ofInstance(new 
SegmentsMetadataManagerConfig(Period.millis(10), cacheMode, null)),
         derbyConnectorRule.metadataTablesConfigSupplier(),
         segmentSchemaCache,
         derbyConnectorRule.getConnector(),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java
index 00169b0a955..bc6be505c62 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java
@@ -36,11 +36,11 @@ public class OverlordDutyExecutorTest
   {
     OverlordDuty testDuty1 = Mockito.mock(OverlordDuty.class);
     Mockito.when(testDuty1.isEnabled()).thenReturn(true);
-    Mockito.when(testDuty1.getSchedule()).thenReturn(new DutySchedule(0, 0));
+    Mockito.when(testDuty1.getSchedule()).thenReturn(new DutySchedule(1, 0));
 
     OverlordDuty testDuty2 = Mockito.mock(OverlordDuty.class);
     Mockito.when(testDuty2.isEnabled()).thenReturn(true);
-    Mockito.when(testDuty2.getSchedule()).thenReturn(new DutySchedule(0, 0));
+    Mockito.when(testDuty2.getSchedule()).thenReturn(new DutySchedule(1, 0));
 
     ScheduledExecutorFactory executorFactory = 
Mockito.mock(ScheduledExecutorFactory.class);
     ScheduledExecutorService executorService = 
Mockito.mock(ScheduledExecutorService.class);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
new file mode 100644
index 00000000000..64a551fdee4
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.actions.LocalTaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionTestKit;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskMetrics;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.indexing.overlord.TimeChunkLockRequest;
+import org.apache.druid.indexing.test.TestDataSegmentKiller;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.UnusedSegmentKillerConfig;
+import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.TestDataSource;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
+import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
+import 
org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class UnusedSegmentsKillerTest
+{
+  @Rule
+  public TaskActionTestKit taskActionTestKit = new TaskActionTestKit();
+
+  private static final List<DataSegment> WIKI_SEGMENTS_1X10D =
+      CreateDataSegments.ofDatasource(TestDataSource.WIKI)
+                        .forIntervals(10, Granularities.DAY)
+                        .eachOfSize(500);
+
+  private StubServiceEmitter emitter;
+  private UnusedSegmentsKiller killer;
+  private BlockingExecutorService killExecutor;
+  private UnusedSegmentKillerConfig killerConfig;
+  private TestDruidLeaderSelector leaderSelector;
+  private TestDataSegmentKiller dataSegmentKiller;
+  private IndexerMetadataStorageCoordinator storageCoordinator;
+
+  @Before
+  public void setup()
+  {
+    emitter = taskActionTestKit.getServiceEmitter();
+    leaderSelector = new TestDruidLeaderSelector();
+    dataSegmentKiller = new TestDataSegmentKiller();
+    killerConfig = new UnusedSegmentKillerConfig(true, Period.ZERO);
+    killExecutor = new BlockingExecutorService("UnusedSegmentsKillerTest-%s");
+    storageCoordinator = taskActionTestKit.getMetadataStorageCoordinator();
+    initKiller();
+  }
+
+  private void initKiller()
+  {
+    killer = new UnusedSegmentsKiller(
+        new SegmentsMetadataManagerConfig(
+            null,
+            SegmentMetadataCache.UsageMode.ALWAYS,
+            killerConfig
+        ),
+        task -> new LocalTaskActionClient(task, 
taskActionTestKit.getTaskActionToolbox()),
+        storageCoordinator,
+        leaderSelector,
+        (corePoolSize, nameFormat) -> new 
WrappingScheduledExecutorService(nameFormat, killExecutor, true),
+        dataSegmentKiller,
+        taskActionTestKit.getTaskLockbox(),
+        taskActionTestKit.getServiceEmitter()
+    );
+  }
+
+  private void finishQueuedKillJobs()
+  {
+    killExecutor.finishAllPendingTasks();
+  }
+
+  @Test
+  public void test_getSchedule_returnsOneHourPeriod_ifEnabled()
+  {
+    final DutySchedule schedule = killer.getSchedule();
+    Assert.assertEquals(Duration.standardHours(1).getMillis(), 
schedule.getPeriodMillis());
+    Assert.assertEquals(Duration.standardMinutes(1).getMillis(), 
schedule.getInitialDelayMillis());
+  }
+
+  @Test
+  public void test_getSchedule_returnsZeroPeriod_ifDisabled()
+  {
+    killerConfig = new UnusedSegmentKillerConfig(false, null);
+    initKiller();
+
+    final DutySchedule schedule = killer.getSchedule();
+    Assert.assertEquals(0, schedule.getPeriodMillis());
+    Assert.assertEquals(0, schedule.getInitialDelayMillis());
+  }
+
+  @Test
+  public void test_run_startsProcessing_ifEnabled()
+  {
+    Assert.assertFalse(killExecutor.hasPendingTasks());
+    Assert.assertTrue(killer.isEnabled());
+
+    killer.run();
+    Assert.assertTrue(killExecutor.hasPendingTasks());
+  }
+
+  @Test
+  public void test_run_isNoop_ifDisabled()
+  {
+    killerConfig = new UnusedSegmentKillerConfig(false, null);
+    initKiller();
+
+    Assert.assertFalse(killer.isEnabled());
+
+    killer.run();
+    Assert.assertFalse(killExecutor.hasPendingTasks());
+  }
+
+  @Test
+  public void test_run_doesNotProcessSegments_ifNotLeader()
+  {
+    storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null);
+    storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+    leaderSelector.becomeLeader();
+    killer.run();
+
+    leaderSelector.stopBeingLeader();
+
+    Assert.assertTrue(killExecutor.hasPendingTasks());
+
+    finishQueuedKillJobs();
+    emitter.verifyNotEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS);
+    Assert.assertFalse(killExecutor.hasPendingTasks());
+  }
+
+  @Test
+  public void test_run_launchesEmbeddedKillTasks_ifLeader()
+  {
+    leaderSelector.becomeLeader();
+
+    storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null);
+    storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+    // Reset the queue and verify that kill jobs have been added to the queue
+    killer.run();
+    Assert.assertTrue(killExecutor.hasPendingTasks());
+    emitter.verifyNotEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS);
+
+    finishQueuedKillJobs();
+    emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10);
+
+    emitter.verifyEmitted(UnusedSegmentsKiller.Metric.QUEUE_PROCESS_TIME, 1);
+    emitter.verifyEmitted(TaskMetrics.RUN_DURATION, 10);
+
+    emitter.verifyEmitted(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 
10);
+    emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 10L);
+
+    emitter.verifyEmitted(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 10);
+    emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 10L);
+
+    Assert.assertTrue(
+        retrieveUnusedSegments(Intervals.ETERNITY).isEmpty()
+    );
+  }
+
+  @Test
+  public void test_maxSegmentsKilledInAnInterval_is_1k()
+  {
+    leaderSelector.becomeLeader();
+
+    final List<DataSegment> segments =
+        CreateDataSegments.ofDatasource(TestDataSource.WIKI)
+                          .forIntervals(1, Granularities.DAY)
+                          .withNumPartitions(2000)
+                          .eachOfSizeInMb(50);
+
+    storageCoordinator.commitSegments(Set.copyOf(segments), null);
+    storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+    Assert.assertEquals(
+        2000,
+        retrieveUnusedSegments(segments.get(0).getInterval()).size()
+    );
+
+    // Reset the kill queue and execute kill tasks
+    killer.run();
+    finishQueuedKillJobs();
+
+    // Verify that a single kill task has run which killed 1k segments
+    emitter.verifyEmitted(TaskMetrics.RUN_DURATION, 1);
+    emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 1000L);
+
+    Assert.assertEquals(
+        1000,
+        retrieveUnusedSegments(segments.get(0).getInterval()).size()
+    );
+  }
+
+  @Test(timeout = 20_000L)
+  public void test_maxIntervalsKilledInADatasource_is_10k()
+  {
+    leaderSelector.becomeLeader();
+
+    final List<DataSegment> segments =
+        CreateDataSegments.ofDatasource(TestDataSource.WIKI)
+                          .forIntervals(20_000, Granularities.DAY)
+                          .eachOfSizeInMb(50);
+
+    storageCoordinator.commitSegments(Set.copyOf(segments), null);
+    storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+    Assert.assertEquals(
+        20_000,
+        retrieveUnusedSegments(Intervals.ETERNITY).size()
+    );
+
+    // Reset the kill queue and execute kill tasks
+    killer.run();
+    finishQueuedKillJobs();
+
+    // Verify that 10k kill tasks have run, each killing a single segment
+    emitter.verifyEmitted(TaskMetrics.RUN_DURATION, 10000);
+    emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 
10_000L);
+
+    Assert.assertEquals(
+        10_000,
+        retrieveUnusedSegments(Intervals.ETERNITY).size()
+    );
+  }
+
+  @Test
+  public void test_run_resetsQueue_ifLeadershipIsReacquired()
+  {
+    leaderSelector.becomeLeader();
+
+    // Verify that the queue has been reset
+    killer.run();
+    finishQueuedKillJobs();
+    emitter.verifyEmitted(UnusedSegmentsKiller.Metric.QUEUE_RESET_TIME, 1);
+    emitter.verifyNotEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS);
+
+    storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null);
+    storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+    // Lose and reacquire leadership
+    leaderSelector.stopBeingLeader();
+    leaderSelector.becomeLeader();
+
+    // Run again and verify that queue has been reset
+    emitter.flush();
+    killer.run();
+    finishQueuedKillJobs();
+    emitter.verifyEmitted(UnusedSegmentsKiller.Metric.QUEUE_RESET_TIME, 1);
+    emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10);
+  }
+
+  @Test
+  public void 
test_run_doesNotResetQueue_ifThereArePendingJobs_andLastRunWasLessThanOneDayAgo()
+  {
+    leaderSelector.becomeLeader();
+
+    storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null);
+    storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+    // Invoke run, reset the queue and process only some of the jobs
+    killer.run();
+    killExecutor.finishNextPendingTasks(6);
+    emitter.verifyEmitted(UnusedSegmentsKiller.Metric.QUEUE_RESET_TIME, 1);
+    emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 5);
+
+    Assert.assertTrue(killExecutor.hasPendingTasks());
+
+    // Invoke run again and verify that queue has not been reset
+    emitter.flush();
+    killer.run();
+    finishQueuedKillJobs();
+    emitter.verifyNotEmitted(UnusedSegmentsKiller.Metric.QUEUE_RESET_TIME);
+    emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 5);
+
+    // All jobs have been processed
+    Assert.assertFalse(killExecutor.hasPendingTasks());
+  }
+
+  @Test
+  public void test_run_prioritizesOlderIntervals()
+  {
+    leaderSelector.becomeLeader();
+
+    storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null);
+    storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+    killer.run();
+    finishQueuedKillJobs();
+    emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10);
+
+    // Verify that the kill intervals are sorted with the oldest interval first
+    final List<StubServiceEmitter.ServiceMetricEventSnapshot> events =
+        emitter.getMetricEvents().get(TaskMetrics.RUN_DURATION);
+    final List<Interval> killIntervals = events.stream().map(event -> {
+      final String taskId = (String) 
event.getUserDims().get(DruidMetrics.TASK_ID);
+      String[] splits = taskId.split("_");
+      return Intervals.of(splits[4] + "/" + splits[5]);
+    }).collect(Collectors.toList());
+
+    Assert.assertEquals(10, killIntervals.size());
+
+    final List<Interval> expectedIntervals =
+        WIKI_SEGMENTS_1X10D.stream()
+                           .map(DataSegment::getInterval)
+                           .sorted(Comparators.intervalsByEndThenStart())
+                           .collect(Collectors.toList());
+    Assert.assertEquals(expectedIntervals, killIntervals);
+  }
+
+  @Test
+  public void 
test_run_doesNotDeleteSegmentFiles_ifLoadSpecIsUsedByAnotherSegment()
+  {
+    storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null);
+    storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+    // Add a new segment upgraded from one of the unused segments
+    final DataSegment upgradedSegment1 = 
WIKI_SEGMENTS_1X10D.get(0).withVersion("v2");
+    final DataSegment upgradedSegment2 = 
WIKI_SEGMENTS_1X10D.get(1).withVersion("v2");
+    storageCoordinator.commitSegments(Set.of(upgradedSegment1, 
upgradedSegment2), null);
+
+    leaderSelector.becomeLeader();
+    killer.run();
+
+    // Verify that all unused segments are deleted from metadata store but the
+    // ones with used load specs are not deleted from the deep store
+    finishQueuedKillJobs();
+    emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 10L);
+    emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 8L);
+  }
+
+  @Test
+  public void test_run_doesNotKillSegment_ifUpdatedWithinBufferPeriod()
+  {
+    killerConfig = new UnusedSegmentKillerConfig(true, Period.hours(1));
+    initKiller();
+
+    storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null);
+    storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+    leaderSelector.becomeLeader();
+    killer.run();
+    finishQueuedKillJobs();
+
+    // Verify that tasks are launched but no segment is killed
+    emitter.verifyValue(UnusedSegmentsKiller.Metric.UNUSED_SEGMENT_INTERVALS, 
10L);
+    emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10);
+    emitter.verifyEmitted(TaskMetrics.RUN_DURATION, 10);
+
+    emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 0L);
+    emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 0L);
+  }
+
+  @Test
+  public void test_run_killsFutureSegment()
+  {
+    final List<DataSegment> futureSegments = CreateDataSegments
+        .ofDatasource(TestDataSource.WIKI)
+        .forIntervals(10, Granularities.DAY)
+        .startingAt("2050-01-01")
+        .eachOfSize(500);
+    storageCoordinator.commitSegments(Set.copyOf(futureSegments), null);
+    storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+    leaderSelector.becomeLeader();
+    killer.run();
+    finishQueuedKillJobs();
+
+    emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 10L);
+    emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 10L);
+  }
+
+  @Test
+  public void test_run_skipsLockedIntervals() throws InterruptedException
+  {
+    storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null);
+    storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI);
+
+    // Lock up some of the segment intervals
+    final Interval lockedInterval = new Interval(
+        WIKI_SEGMENTS_1X10D.get(0).getInterval().getStart(),
+        WIKI_SEGMENTS_1X10D.get(4).getInterval().getEnd()
+    );
+
+    final Task ingestionTask = new NoopTask(null, null, TestDataSource.WIKI, 
0L, 0L, null);
+    final TaskLockbox taskLockbox = taskActionTestKit.getTaskLockbox();
+
+    try {
+      taskLockbox.add(ingestionTask);
+      taskLockbox.lock(
+          ingestionTask,
+          new TimeChunkLockRequest(TaskLockType.APPEND, ingestionTask, 
lockedInterval, null)
+      );
+
+      leaderSelector.becomeLeader();
+      killer.run();
+      finishQueuedKillJobs();
+
+      // Verify that unused segments from locked intervals are not killed
+      emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 5L);
+      emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 5L);
+      emitter.verifySum(UnusedSegmentsKiller.Metric.SKIPPED_INTERVALS, 5L);
+    }
+    finally {
+      taskLockbox.remove(ingestionTask);
+    }
+
+    // Do another run to clean up the rest of the segments
+    killer.run();
+    finishQueuedKillJobs();
+    emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 10L);
+    emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 10L);
+  }
+
+  private List<DataSegment> retrieveUnusedSegments(Interval interval)
+  {
+    return storageCoordinator.retrieveUnusedSegmentsForInterval(
+        TestDataSource.WIKI,
+        interval,
+        null,
+        null
+    );
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index bd8d6481ba6..d0e639db2c2 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -56,6 +56,29 @@ public class TestIndexerMetadataStorageCoordinator 
implements IndexerMetadataSto
 
   private int deleteSegmentsCount = 0;
 
+  @Override
+  public Set<String> retrieveAllDatasourceNames()
+  {
+    return Set.of();
+  }
+
+  @Override
+  public List<Interval> retrieveUnusedSegmentIntervals(String dataSource, int 
limit)
+  {
+    return List.of();
+  }
+
+  @Override
+  public List<DataSegment> retrieveUnusedSegmentsWithExactInterval(
+      String dataSource,
+      Interval interval,
+      DateTime maxUpdatedTime,
+      int limit
+  )
+  {
+    return List.of();
+  }
+
   @Override
   public DataSourceMetadata retrieveDataSourceMetadata(String dataSource)
   {
@@ -80,12 +103,6 @@ public class TestIndexerMetadataStorageCoordinator 
implements IndexerMetadataSto
     return false;
   }
 
-  @Override
-  public Set<String> retrieveAllDatasourceNames()
-  {
-    return Set.of();
-  }
-
   @Override
   public Set<DataSegment> retrieveAllUsedSegments(String dataSource, Segments 
visibility)
   {
@@ -286,10 +303,11 @@ public class TestIndexerMetadataStorageCoordinator 
implements IndexerMetadataSto
   }
 
   @Override
-  public void deleteSegments(Set<DataSegment> segments)
+  public int deleteSegments(Set<DataSegment> segments)
   {
     deleteSegmentsCount++;
     nuked.addAll(segments);
+    return segments.size();
   }
 
   @Override
diff --git 
a/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java 
b/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java
index a877d2bf7b1..750f9090c26 100644
--- 
a/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java
+++ 
b/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java
@@ -56,7 +56,7 @@ public class KillTaskReport implements TaskReport
 
   @Override
   @JsonProperty
-  public Object getPayload()
+  public Stats getPayload()
   {
     return stats;
   }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
 
b/processing/src/main/java/org/apache/druid/segment/loading/SegmentKillResult.java
similarity index 52%
copy from 
server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
copy to 
processing/src/main/java/org/apache/druid/segment/loading/SegmentKillResult.java
index cb02f7c8f38..ff3c3b8a98b 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/loading/SegmentKillResult.java
@@ -17,35 +17,33 @@
  * under the License.
  */
 
-package org.apache.druid.metadata.segment;
+package org.apache.druid.segment.loading;
 
-import org.skife.jdbi.v2.Handle;
-
-import java.io.Closeable;
+import java.util.List;
 
 /**
- * Represents a single transaction involving read of segment metadata into
- * the metadata store. A transaction is associated with a single instance of a
- * {@link Handle} and is meant to be short-lived.
+ * Result of killing data segments using {@link DataSegmentKiller}.
  */
-public interface SegmentMetadataReadTransaction
-    extends DatasourceSegmentMetadataReader, Closeable
+public class SegmentKillResult
 {
-  /**
-   * @return The JDBI handle used in this transaction
-   */
-  Handle getHandle();
+  private static final SegmentKillResult EMPTY_INSTANCE = new 
SegmentKillResult(List.of());
+
+  public static SegmentKillResult empty()
+  {
+    return EMPTY_INSTANCE;
+  }
 
-  /**
-   * Completes the transaction by either committing it or rolling it back.
-   */
-  @Override
-  void close();
+  private final List<String> deletedPaths;
 
-  @FunctionalInterface
-  interface Callback<T>
+  public SegmentKillResult(
+      List<String> deletedPaths
+  )
   {
-    T inTransaction(SegmentMetadataReadTransaction transaction) throws 
Exception;
+    this.deletedPaths = deletedPaths;
   }
 
+  public List<String> getDeletedPaths()
+  {
+    return deletedPaths;
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/metrics/MetricsVerifier.java
 
b/processing/src/test/java/org/apache/druid/java/util/metrics/MetricsVerifier.java
index 98e538e3b06..5ce60dcfaeb 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/metrics/MetricsVerifier.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/metrics/MetricsVerifier.java
@@ -22,6 +22,7 @@ package org.apache.druid.java.util.metrics;
 import org.apache.druid.java.util.common.StringUtils;
 import org.junit.Assert;
 
+import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
 
@@ -50,7 +51,7 @@ public interface MetricsVerifier
    * Verifies that the metric was emitted for the given dimension filters the
    * expected number of times.
    */
-  default void verifyEmitted(String metricName, Map<String, Object> 
dimensionFilters, int times)
+  default void verifyEmitted(String metricName, @Nullable Map<String, Object> 
dimensionFilters, int times)
   {
     Assert.assertEquals(
         StringUtils.format("Metric [%s] was emitted unexpected number of 
times.", metricName),
@@ -71,7 +72,7 @@ public interface MetricsVerifier
    * Verifies the value of the event corresponding to the specified metric and
    * dimensionFilters emitted in the previous run.
    */
-  default void verifyValue(String metricName, Map<String, Object> 
dimensionFilters, Number expectedValue)
+  default void verifyValue(String metricName, @Nullable Map<String, Object> 
dimensionFilters, Number expectedValue)
   {
     Assert.assertEquals(expectedValue, getValue(metricName, dimensionFilters));
   }
@@ -80,7 +81,7 @@ public interface MetricsVerifier
    * Gets the value of the event corresponding to the specified metric and
    * dimensionFilters.
    */
-  default Number getValue(String metricName, Map<String, Object> 
dimensionFilters)
+  default Number getValue(String metricName, @Nullable Map<String, Object> 
dimensionFilters)
   {
     List<Number> values = getMetricValues(metricName, dimensionFilters);
     Assert.assertEquals(
@@ -91,9 +92,36 @@ public interface MetricsVerifier
     return values.get(0);
   }
 
+  /**
+   * Gets the sum of values of all events corresponding to the specified 
metric.
+   */
+  default void verifySum(String metricName, long expectedSum)
+  {
+    verifySum(metricName, null, expectedSum);
+  }
+
+  /**
+   * Gets the sum of values of all events corresponding to the specified metric
+   * and dimensionFilters.
+   */
+  default void verifySum(String metricName, @Nullable Map<String, Object> 
dimensionFilters, long expectedSum)
+  {
+    long observedSum = getMetricValues(metricName, dimensionFilters)
+        .stream()
+        .mapToLong(Number::longValue)
+        .sum();
+    Assert.assertEquals(
+        StringUtils.format(
+            "Unexpected sum[%s] of metric[%s] with filters[%s]",
+            observedSum, metricName, dimensionFilters
+        ),
+        expectedSum, observedSum
+    );
+  }
+
   /**
    * Gets the metric values for the specified dimension filters.
    */
-  List<Number> getMetricValues(String metricName, Map<String, Object> 
dimensionFilters);
+  List<Number> getMetricValues(String metricName, @Nullable Map<String, 
Object> dimensionFilters);
 
 }
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index db62686cd59..13da5aa3ac8 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -150,6 +150,26 @@ public interface IndexerMetadataStorageCoordinator
       @Nullable DateTime maxUsedStatusLastUpdatedTime
   );
 
+  /**
+   * Retrieves unused segments from the metadata store that match the given
+   * interval exactly. There is no guarantee on the order of segments in the 
list
+   * or on whether the limited list contains the highest or lowest segment IDs
+   * in the interval.
+   *
+   * @param interval       Returned segments must exactly match this interval.
+   * @param maxUpdatedTime Returned segments must have a {@code 
used_status_last_updated}
+   *                       which is either null or earlier than this value.
+   * @param limit          Maximum number of segments to return.
+   *
+   * @return Unsorted list of unused segments that match the given parameters.
+   */
+  List<DataSegment> retrieveUnusedSegmentsWithExactInterval(
+      String dataSource,
+      Interval interval,
+      DateTime maxUpdatedTime,
+      int limit
+  );
+
   /**
    * Retrieves segments for the given IDs, regardless of their visibility
    * (visible, overshadowed or unused).
@@ -456,7 +476,12 @@ public interface IndexerMetadataStorageCoordinator
 
   void updateSegmentMetadata(Set<DataSegment> segments);
 
-  void deleteSegments(Set<DataSegment> segments);
+  /**
+   * Deletes unused segments from the metadata store.
+   *
+   * @return Number of segments actually deleted.
+   */
+  int deleteSegments(Set<DataSegment> segments);
 
   /**
    * Retrieve the segment for a given id from the metadata store. Return null 
if no such segment exists
@@ -556,6 +581,15 @@ public interface IndexerMetadataStorageCoordinator
       DateTime maxUsedStatusLastUpdatedTime
   );
 
+  /**
+   * Retrieves intervals of the specified datasource that contain any unused 
segments.
+   * There is no guarantee on the order of intervals in the list or on whether
+   * the limited list contains the earliest or latest intervals of the 
datasource.
+   *
+   * @return Unsorted list of unused segment intervals containing upto {@code 
limit} entries.
+   */
+  List<Interval> retrieveUnusedSegmentIntervals(String dataSource, int limit);
+
   /**
    * Returns the number of segment entries in the database whose state was 
changed as the result of this call (that is,
    * the segments were marked as used). If the call results in a database 
error, an exception is relayed to the caller.
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 8ede92dc9c5..722b3a8155e 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -48,7 +48,6 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.metadata.segment.DatasourceSegmentMetadataWriter;
 import org.apache.druid.metadata.segment.SegmentMetadataReadTransaction;
 import org.apache.druid.metadata.segment.SegmentMetadataTransaction;
 import org.apache.druid.metadata.segment.SegmentMetadataTransactionFactory;
@@ -158,6 +157,14 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     );
   }
 
+  @Override
+  public List<Interval> retrieveUnusedSegmentIntervals(String dataSource, int 
limit)
+  {
+    return inReadOnlyTransaction(
+        sql -> sql.retrieveUnusedSegmentIntervals(dataSource, limit)
+    );
+  }
+
   @Override
   public Set<DataSegment> retrieveUsedSegmentsForIntervals(
       final String dataSource,
@@ -225,7 +232,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   {
     final List<DataSegment> matchingSegments = inReadOnlyDatasourceTransaction(
         dataSource,
-        transaction -> transaction.findUnusedSegments(
+        transaction -> transaction.noCacheSql().findUnusedSegments(
+            dataSource,
             interval,
             versions,
             limit,
@@ -241,6 +249,24 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     return matchingSegments;
   }
 
+  @Override
+  public List<DataSegment> retrieveUnusedSegmentsWithExactInterval(
+      String dataSource,
+      Interval interval,
+      DateTime maxUpdatedTime,
+      int limit
+  )
+  {
+    return inReadOnlyTransaction(
+        sql -> sql.retrieveUnusedSegmentsWithExactInterval(
+            dataSource,
+            interval,
+            maxUpdatedTime,
+            limit
+        )
+    );
+  }
+
   @Override
   public Set<DataSegment> retrieveSegmentsById(String dataSource, Set<String> 
segmentIds)
   {
@@ -1574,7 +1600,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     }
 
     // If yes, try to compute allocated partition num using the max unused 
segment shard spec
-    SegmentId unusedMaxId = transaction.findHighestUnusedSegmentId(
+    SegmentId unusedMaxId = 
transaction.noCacheSql().retrieveHighestUnusedSegmentId(
+        allocatedId.getDataSource(),
         allocatedId.getInterval(),
         allocatedId.getVersion()
     );
@@ -1617,7 +1644,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   {
     return inReadWriteDatasourceTransaction(
         dataSource,
-        DatasourceSegmentMetadataWriter::deleteAllPendingSegments
+        SegmentMetadataTransaction::deleteAllPendingSegments
     );
   }
 
@@ -2305,11 +2332,11 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   }
 
   @Override
-  public void deleteSegments(final Set<DataSegment> segments)
+  public int deleteSegments(final Set<DataSegment> segments)
   {
     if (segments.isEmpty()) {
       log.info("No segments to delete.");
-      return;
+      return 0;
     }
 
     final String dataSource = verifySegmentsToCommit(segments);
@@ -2322,7 +2349,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     );
 
     log.debugSegments(segments, "Delete the metadata of segments");
-    log.info("Deleted [%d] segments from metadata storage for dataSource 
[%s].", numDeletedSegments, dataSource);
+    return numDeletedSegments;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
 
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
index f53f3108221..5718a0bdc9a 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.metadata;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.common.config.Configs;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
 import org.joda.time.Period;
 
@@ -39,14 +40,29 @@ public class SegmentsMetadataManagerConfig
   @JsonProperty
   private final SegmentMetadataCache.UsageMode useIncrementalCache;
 
+  @JsonProperty
+  private final UnusedSegmentKillerConfig killUnused;
+
   @JsonCreator
   public SegmentsMetadataManagerConfig(
       @JsonProperty("pollDuration") Period pollDuration,
-      @JsonProperty("useIncrementalCache") SegmentMetadataCache.UsageMode 
useIncrementalCache
+      @JsonProperty("useIncrementalCache") SegmentMetadataCache.UsageMode 
useIncrementalCache,
+      @JsonProperty("killUnused") UnusedSegmentKillerConfig killUnused
   )
   {
     this.pollDuration = Configs.valueOrDefault(pollDuration, 
Period.minutes(1));
     this.useIncrementalCache = Configs.valueOrDefault(useIncrementalCache, 
SegmentMetadataCache.UsageMode.NEVER);
+    this.killUnused = Configs.valueOrDefault(killUnused, new 
UnusedSegmentKillerConfig(null, null));
+    if (this.killUnused.isEnabled() && this.useIncrementalCache == 
SegmentMetadataCache.UsageMode.NEVER) {
+      throw DruidException
+          .forPersona(DruidException.Persona.OPERATOR)
+          .ofCategory(DruidException.Category.INVALID_INPUT)
+          .build(
+              "Segment metadata cache must be enabled to allow killing of 
unused segments."
+              + " Set 'druid.manager.segments.useIncrementalCache=always'"
+              + " or 'druid.manager.segments.useIncrementalCache=ifSynced' to 
enable the cache."
+          );
+    }
   }
 
   /**
@@ -61,4 +77,9 @@ public class SegmentsMetadataManagerConfig
   {
     return pollDuration;
   }
+
+  public UnusedSegmentKillerConfig getKillUnused()
+  {
+    return killUnused;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index de2e35813c4..2779c16da17 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.UnmodifiableIterator;
 import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InternalServerError;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.java.util.common.CloseableIterators;
 import org.apache.druid.java.util.common.DateTimes;
@@ -187,8 +188,9 @@ public class SqlSegmentsMetadataQuery
   }
 
   /**
-   * Determines the highest ID amongst unused segments for the given 
datasource,
-   * interval and version.
+   * Retrieves the ID of the unused segment that has the highest partition
+   * number amongst all unused segments that exactly match the given interval
+   * and version.
    *
    * @return null if no unused segment exists for the given parameters.
    */
@@ -286,6 +288,37 @@ public class SqlSegmentsMetadataQuery
     }
   }
 
+  /**
+   * Retrieves unused segments that are fully contained within the given 
interval.
+   *
+   * @param interval       Returned segments must be fully contained within 
this
+   *                       interval
+   * @param versions       Optional list of segment versions. If passed as 
null,
+   *                       all segment versions are eligible.
+   * @param limit          Maximum number of segments to return. If passed as 
null,
+   *                       all segments are returned.
+   * @param maxUpdatedTime Returned segments must have a {@code 
used_status_last_updated}
+   *                       which is either null or earlier than this value.
+   */
+  public List<DataSegment> findUnusedSegments(
+      String dataSource,
+      Interval interval,
+      @Nullable List<String> versions,
+      @Nullable Integer limit,
+      @Nullable DateTime maxUpdatedTime
+  )
+  {
+    try (
+        final CloseableIterator<DataSegment> iterator =
+            retrieveUnusedSegments(dataSource, List.of(interval), versions, 
limit, null, null, maxUpdatedTime)
+    ) {
+      return ImmutableList.copyOf(iterator);
+    }
+    catch (IOException e) {
+      throw InternalServerError.exception(e, "Error while reading unused 
segments");
+    }
+  }
+
   /**
    * Retrieves segments for a given datasource that are marked unused and that 
are <b>fully contained by</b> any interval
    * in a particular collection of intervals. If the collection of intervals 
is empty, this method will retrieve all
@@ -1027,6 +1060,74 @@ public class SqlSegmentsMetadataQuery
     return 
unusedIntervals.stream().filter(Objects::nonNull).collect(Collectors.toList());
   }
 
+  /**
+   * Gets unused segment intervals for the specified datasource. There is no
+   * guarantee on the order of intervals in the list or on whether the limited
+   * list contains the earliest or latest intervals present in the datasource.
+   *
+   * @return List of unused segment intervals containing upto {@code limit} 
entries.
+   */
+  public List<Interval> retrieveUnusedSegmentIntervals(String dataSource, int 
limit)
+  {
+    final String sql = StringUtils.format(
+        "SELECT start, %2$send%2$s FROM %1$s"
+        + " WHERE dataSource = :dataSource AND used = false"
+        + " GROUP BY start, %2$send%2$s"
+        + "  %3$s",
+        dbTables.getSegmentsTable(), connector.getQuoteString(), 
connector.limitClause(limit)
+    );
+
+    final List<Interval> intervals = connector.inReadOnlyTransaction(
+        (handle, status) ->
+            handle.createQuery(sql)
+                  .setFetchSize(connector.getStreamingFetchSize())
+                  .bind("dataSource", dataSource)
+                  .map((index, r, ctx) -> mapToInterval(r, dataSource))
+                  .list()
+    );
+
+    return 
intervals.stream().filter(Objects::nonNull).collect(Collectors.toList());
+  }
+
+  /**
+   * Retrieves unused segments that exactly match the given interval.
+   *
+   * @param interval       Returned segments must exactly match this interval.
+   * @param maxUpdatedTime Returned segments must have a {@code 
used_status_last_updated}
+   *                       which is either null or earlier than this value.
+   * @param limit          Maximum number of segments to return
+   */
+  public List<DataSegment> retrieveUnusedSegmentsWithExactInterval(
+      String dataSource,
+      Interval interval,
+      DateTime maxUpdatedTime,
+      int limit
+  )
+  {
+    final String sql = StringUtils.format(
+        "SELECT id, payload FROM %1$s"
+        + " WHERE dataSource = :dataSource AND used = false"
+        + " AND %2$send%2$s = :end AND start = :start"
+        + " AND (used_status_last_updated IS NULL OR used_status_last_updated 
<= :maxUpdatedTime)"
+        + "  %3$s",
+        dbTables.getSegmentsTable(), connector.getQuoteString(), 
connector.limitClause(limit)
+    );
+
+    final List<DataSegment> segments = connector.inReadOnlyTransaction(
+        (handle, status) ->
+            handle.createQuery(sql)
+                  .setFetchSize(connector.getStreamingFetchSize())
+                  .bind("dataSource", dataSource)
+                  .bind("start", interval.getStart().toString())
+                  .bind("end", interval.getEnd().toString())
+                  .bind("maxUpdatedTime", maxUpdatedTime.toString())
+                  .map((index, r, ctx) -> mapToSegment(r))
+                  .list()
+    );
+
+    return 
segments.stream().filter(Objects::nonNull).collect(Collectors.toList());
+  }
+
   /**
    * Retrieve the used segment for a given id if it exists in the metadata 
store and null otherwise
    */
@@ -1608,6 +1709,20 @@ public class SqlSegmentsMetadataQuery
     }).iterator();
   }
 
+  @Nullable
+  private DataSegment mapToSegment(ResultSet resultSet)
+  {
+    String segmentId = "";
+    try {
+      segmentId = resultSet.getString("id");
+      return JacksonUtils.readValue(jsonMapper, resultSet.getBytes("payload"), 
DataSegment.class);
+    }
+    catch (Throwable t) {
+      log.error(t, "Could not read segment with ID[%s]", segmentId);
+      return null;
+    }
+  }
+
   private UnmodifiableIterator<DataSegment> 
filterDataSegmentIteratorByInterval(
       ResultIterator<DataSegment> resultIterator,
       final Collection<Interval> intervals,
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
 b/server/src/main/java/org/apache/druid/metadata/UnusedSegmentKillerConfig.java
similarity index 52%
copy from 
server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
copy to 
server/src/main/java/org/apache/druid/metadata/UnusedSegmentKillerConfig.java
index f53f3108221..082b08251a8 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/UnusedSegmentKillerConfig.java
@@ -22,43 +22,42 @@ package org.apache.druid.metadata;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.common.config.Configs;
-import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
 import org.joda.time.Period;
 
+import javax.annotation.Nullable;
+
 /**
- * Config that dictates polling and caching of segment metadata on leader
- * Coordinator or Overlord services.
+ * Config for {@code UnusedSegmentKiller}. This is used only by the Overlord.
+ * Enabling this config on the Coordinator or other services has no effect.
  */
-public class SegmentsMetadataManagerConfig
+public class UnusedSegmentKillerConfig
 {
-  public static final String CONFIG_PREFIX = "druid.manager.segments";
-
-  @JsonProperty
-  private final Period pollDuration;
+  @JsonProperty("enabled")
+  private final boolean enabled;
 
-  @JsonProperty
-  private final SegmentMetadataCache.UsageMode useIncrementalCache;
+  @JsonProperty("bufferPeriod")
+  private final Period bufferPeriod;
 
   @JsonCreator
-  public SegmentsMetadataManagerConfig(
-      @JsonProperty("pollDuration") Period pollDuration,
-      @JsonProperty("useIncrementalCache") SegmentMetadataCache.UsageMode 
useIncrementalCache
+  public UnusedSegmentKillerConfig(
+      @JsonProperty("enabled") @Nullable Boolean enabled,
+      @JsonProperty("bufferPeriod") @Nullable Period bufferPeriod
   )
   {
-    this.pollDuration = Configs.valueOrDefault(pollDuration, 
Period.minutes(1));
-    this.useIncrementalCache = Configs.valueOrDefault(useIncrementalCache, 
SegmentMetadataCache.UsageMode.NEVER);
+    this.enabled = Configs.valueOrDefault(enabled, false);
+    this.bufferPeriod = Configs.valueOrDefault(bufferPeriod, Period.days(30));
   }
 
   /**
-   * Usage mode of the incremental cache.
+   * Period for which segments are retained even after being marked as unused.
    */
-  public SegmentMetadataCache.UsageMode getCacheUsageMode()
+  public Period getBufferPeriod()
   {
-    return useIncrementalCache;
+    return bufferPeriod;
   }
 
-  public Period getPollDuration()
+  public boolean isEnabled()
   {
-    return pollDuration;
+    return enabled;
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java
 
b/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java
index 3d19d7c7365..cc949a5dd6e 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java
@@ -22,6 +22,7 @@ package org.apache.druid.metadata.segment;
 import org.apache.druid.discovery.DruidLeaderSelector;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.metadata.PendingSegmentRecord;
+import org.apache.druid.metadata.SqlSegmentsMetadataQuery;
 import org.apache.druid.metadata.segment.cache.DatasourceSegmentCache;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.server.http.DataSegmentPlus;
@@ -118,6 +119,12 @@ class CachedSegmentMetadataTransaction implements 
SegmentMetadataTransaction
     return delegate.getHandle();
   }
 
+  @Override
+  public SqlSegmentsMetadataQuery noCacheSql()
+  {
+    return delegate.noCacheSql();
+  }
+
   @Override
   public void setRollbackOnly()
   {
diff --git 
a/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
 
b/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
index cb02f7c8f38..924a8c3c54b 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.metadata.segment;
 
+import org.apache.druid.metadata.SqlSegmentsMetadataQuery;
 import org.skife.jdbi.v2.Handle;
 
 import java.io.Closeable;
@@ -36,6 +37,11 @@ public interface SegmentMetadataReadTransaction
    */
   Handle getHandle();
 
+  /**
+   * @return SQL tool to read or update the metadata store directly.
+   */
+  SqlSegmentsMetadataQuery noCacheSql();
+
   /**
    * Completes the transaction by either committing it or rolling it back.
    */
diff --git 
a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransaction.java
 
b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransaction.java
index 116832dfe31..5c9938f8c5a 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransaction.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransaction.java
@@ -96,6 +96,12 @@ class SqlSegmentMetadataTransaction implements 
SegmentMetadataTransaction
     return handle;
   }
 
+  @Override
+  public SqlSegmentsMetadataQuery noCacheSql()
+  {
+    return query;
+  }
+
   @Override
   public void setRollbackOnly()
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index 64d81a066f9..26b34a36845 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -67,10 +67,10 @@ import java.util.concurrent.ConcurrentHashMap;
  * The datasources to be killed during each cycle are selected from {@link 
#datasourceCircularKillList}. This state is
  * refreshed in a run if the set of datasources to be killed changes. 
Consecutive duplicate datasources are avoided
  * across runs, provided there are other datasources to be killed.
- * </p>
- * <p>
- * See {@link org.apache.druid.indexing.common.task.KillUnusedSegmentsTask}.
- * </p>
+ *
+ * @see org.apache.druid.indexing.common.task.KillUnusedSegmentsTask for 
details
+ * of the actual kill task and {@code UnusedSegmentKiller} to run embedded kill
+ * tasks on the Overlord.
  */
 public class KillUnusedSegments implements CoordinatorDuty
 {
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
index 48656aa93e9..72ee7a9a169 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java
@@ -101,7 +101,7 @@ public class 
IndexerSQLMetadataStorageCoordinatorReadOnlyTest extends IndexerSql
     cachePollExecutor = new BlockingExecutorService("test-cache-poll-exec");
     segmentMetadataCache = new HeapMemorySegmentMetadataCache(
         mapper,
-        () -> new SegmentsMetadataManagerConfig(null, cacheMode),
+        () -> new SegmentsMetadataManagerConfig(null, cacheMode, null),
         derbyConnectorRule.metadataTablesConfigSupplier(),
         new NoopSegmentSchemaCache(),
         derbyConnector,
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 9a1b7ce63ed..db5c23b8d30 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -151,7 +151,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
 
     segmentMetadataCache = new HeapMemorySegmentMetadataCache(
         mapper,
-        () -> new SegmentsMetadataManagerConfig(null, cacheMode),
+        () -> new SegmentsMetadataManagerConfig(null, cacheMode, null),
         derbyConnectorRule.metadataTablesConfigSupplier(),
         new NoopSegmentSchemaCache(),
         derbyConnector,
@@ -1831,6 +1831,99 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     Assert.assertTrue(SEGMENTS.containsAll(retreivedUnusedSegments));
   }
 
+  @Test
+  public void testRetrieveUnusedSegmentsWithExactInterval()
+  {
+    final String dataSource = defaultSegment.getDataSource();
+    coordinator.commitSegments(Set.of(defaultSegment, defaultSegment2, 
defaultSegment3), null);
+
+    final DateTime now = DateTimes.nowUtc();
+    markAllSegmentsUnused(Set.of(defaultSegment, defaultSegment2, 
defaultSegment3), now.minusHours(1));
+
+    // Verify that query for overlapping interval does not return the segments
+    Assert.assertTrue(
+        coordinator.retrieveUnusedSegmentsWithExactInterval(
+            dataSource,
+            Intervals.ETERNITY,
+            now,
+            10
+        ).isEmpty()
+    );
+
+    // Verify that query for exact interval returns the segments
+    Assert.assertEquals(
+        List.of(defaultSegment3),
+        coordinator.retrieveUnusedSegmentsWithExactInterval(
+            dataSource,
+            defaultSegment3.getInterval(),
+            now,
+            10
+        )
+    );
+
+    Assert.assertEquals(defaultSegment.getInterval(), 
defaultSegment2.getInterval());
+    Assert.assertEquals(
+        Set.of(defaultSegment, defaultSegment2),
+        Set.copyOf(
+            coordinator.retrieveUnusedSegmentsWithExactInterval(
+                dataSource,
+                defaultSegment.getInterval(),
+                now,
+                10
+            )
+        )
+    );
+
+    // Verify that query with limit 1 returns only 1 result
+    Assert.assertEquals(
+        1,
+        coordinator.retrieveUnusedSegmentsWithExactInterval(
+            dataSource,
+            defaultSegment.getInterval(),
+            now,
+            1
+        ).size()
+    );
+  }
+
+  @Test
+  public void testRetrieveUnusedSegmentIntervals()
+  {
+    final String dataSource = defaultSegment.getDataSource();
+    coordinator.commitSegments(Set.of(defaultSegment, defaultSegment3), null);
+
+    Assert.assertTrue(coordinator.retrieveUnusedSegmentIntervals(dataSource, 
100).isEmpty());
+
+    markAllSegmentsUnused(Set.of(defaultSegment), 
DateTimes.nowUtc().minusHours(1));
+    Assert.assertEquals(
+        List.of(defaultSegment.getInterval()),
+        coordinator.retrieveUnusedSegmentIntervals(dataSource, 100)
+    );
+
+    markAllSegmentsUnused(Set.of(defaultSegment3), 
DateTimes.nowUtc().minusHours(1));
+    Assert.assertEquals(
+        Set.of(defaultSegment.getInterval(), defaultSegment3.getInterval()),
+        Set.copyOf(coordinator.retrieveUnusedSegmentIntervals(dataSource, 100))
+    );
+
+    // Verify retrieve with limit 1 returns only 1 interval
+    Assert.assertEquals(
+        1,
+        coordinator.retrieveUnusedSegmentIntervals(dataSource, 1).size()
+    );
+  }
+
+  @Test
+  public void testRetrieveAllDatasourceNames()
+  {
+    coordinator.commitSegments(Set.of(defaultSegment), null);
+    coordinator.commitSegments(Set.of(hugeTimeRangeSegment1), null);
+    Assert.assertEquals(
+        Set.of("fooDataSource", "hugeTimeRangeDataSource"),
+        coordinator.retrieveAllDatasourceNames()
+    );
+  }
+
   @Test
   public void testUsedOverlapLow()
   {
@@ -3742,7 +3835,11 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
 
     SegmentId highestUnusedId = 
transactionFactory.inReadWriteDatasourceTransaction(
         TestDataSource.WIKI,
-        transaction -> 
transaction.findHighestUnusedSegmentId(Intervals.of("2024/2025"), "v1")
+        transaction -> transaction.noCacheSql().retrieveHighestUnusedSegmentId(
+            TestDataSource.WIKI,
+            Intervals.of("2024/2025"),
+            "v1"
+        )
     );
     Assert.assertEquals(
         unusedSegmentForExactIntervalAndVersion.getId(),
diff --git 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java
 
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java
index fb310430ac3..44812c3acbf 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java
@@ -46,7 +46,7 @@ public class SqlSegmentsMetadataManagerProviderTest
   public void testLifecycleStartCreatesSegmentTables() throws Exception
   {
     final TestDerbyConnector connector = derbyConnectorRule.getConnector();
-    final SegmentsMetadataManagerConfig config = new 
SegmentsMetadataManagerConfig(null, null);
+    final SegmentsMetadataManagerConfig config = new 
SegmentsMetadataManagerConfig(null, null, null);
     final Lifecycle lifecycle = new Lifecycle();
     final SegmentSchemaCache segmentSchemaCache = new SegmentSchemaCache();
     SqlSegmentsMetadataManagerProvider provider = new 
SqlSegmentsMetadataManagerProvider(
diff --git 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java
 
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java
index b1d8335a227..8e7e6a7fc5b 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java
@@ -109,7 +109,7 @@ public class SqlSegmentsMetadataManagerSchemaPollTest 
extends SqlSegmentsMetadat
 
     CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
         = CentralizedDatasourceSchemaConfig.enabled(true);
-    config = new SegmentsMetadataManagerConfig(Period.seconds(3), null);
+    config = new SegmentsMetadataManagerConfig(Period.seconds(3), null, null);
     sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager(
         jsonMapper,
         Suppliers.ofInstance(config),
@@ -193,7 +193,7 @@ public class SqlSegmentsMetadataManagerSchemaPollTest 
extends SqlSegmentsMetadat
 
     CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
         = CentralizedDatasourceSchemaConfig.enabled(true);
-    config = new SegmentsMetadataManagerConfig(Period.seconds(3), null);
+    config = new SegmentsMetadataManagerConfig(Period.seconds(3), null, null);
     sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager(
         jsonMapper,
         Suppliers.ofInstance(config),
diff --git 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
 
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
index 987d93dcd27..9e1ba861e1e 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
@@ -384,7 +384,8 @@ public class SqlSegmentsMetadataManagerTest extends 
SqlSegmentsMetadataManagerTe
     final Interval theInterval = 
Intervals.of("2012-03-15T00:00:00.000/2012-03-20T00:00:00.000");
 
     // Re-create SqlSegmentsMetadataManager with a higher poll duration
-    final SegmentsMetadataManagerConfig config = new 
SegmentsMetadataManagerConfig(Period.seconds(1), null);
+    final SegmentsMetadataManagerConfig config =
+        new SegmentsMetadataManagerConfig(Period.seconds(1), null, null);
     sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager(
         jsonMapper,
         Suppliers.ofInstance(config),
diff --git 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java
 
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java
index c35488c62eb..bedc41f11e8 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java
@@ -72,7 +72,7 @@ public class SqlSegmentsMetadataManagerTestBase
 
   protected void setUp(TestDerbyConnector.DerbyConnectorRule 
derbyConnectorRule) throws Exception
   {
-    config = new SegmentsMetadataManagerConfig(Period.seconds(3), null);
+    config = new SegmentsMetadataManagerConfig(Period.seconds(3), null, null);
     connector = derbyConnectorRule.getConnector();
     storageConfig = derbyConnectorRule.metadataTablesConfigSupplier().get();
 
diff --git 
a/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
 
b/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
index e5554b931ee..08098decdab 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java
@@ -88,7 +88,7 @@ public class SqlSegmentsMetadataManagerV2Test extends 
SqlSegmentsMetadataManager
     segmentMetadataCacheExec = new BlockingExecutorService("test");
     SegmentMetadataCache segmentMetadataCache = new 
HeapMemorySegmentMetadataCache(
         jsonMapper,
-        Suppliers.ofInstance(new 
SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode)),
+        Suppliers.ofInstance(new 
SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode, null)),
         Suppliers.ofInstance(storageConfig),
         useSchemaCache ? new SegmentSchemaCache() : new 
NoopSegmentSchemaCache(),
         connector,
diff --git 
a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
 
b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
index 2a5940b0495..fab64152ec8 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java
@@ -117,7 +117,7 @@ public class HeapMemorySegmentMetadataCacheTest
       throw new ISE("Test target has already been initialized with 
caching[%s]", cache.isEnabled());
     }
     final SegmentsMetadataManagerConfig metadataManagerConfig
-        = new SegmentsMetadataManagerConfig(null, cacheMode);
+        = new SegmentsMetadataManagerConfig(null, cacheMode, null);
     schemaCache = useSchemaCache ? new SegmentSchemaCache() : new 
NoopSegmentSchemaCache();
     cache = new HeapMemorySegmentMetadataCache(
         TestHelper.JSON_MAPPER,
diff --git 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
index d1b4cdfb7a6..0dea1be5d1a 100644
--- 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java
@@ -132,7 +132,7 @@ public class CoordinatorSegmentMetadataCacheTest extends 
CoordinatorSegmentMetad
     Mockito.when(segmentsMetadataManager.getRecentDataSourcesSnapshot())
            .thenReturn(DataSourcesSnapshot.fromUsedSegments(List.of()));
     SegmentsMetadataManagerConfig metadataManagerConfig =
-        new SegmentsMetadataManagerConfig(Period.millis(10), null);
+        new SegmentsMetadataManagerConfig(Period.millis(10), null, null);
     segmentsMetadataManagerConfigSupplier = 
Suppliers.ofInstance(metadataManagerConfig);
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java
index d84cbcff6ef..d526ccfaf6f 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java
@@ -23,16 +23,21 @@ import org.apache.druid.discovery.DruidLeaderSelector;
 
 import javax.annotation.Nullable;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class TestDruidLeaderSelector implements DruidLeaderSelector
 {
+  private final AtomicInteger localTerm = new AtomicInteger(0);
   private final AtomicBoolean isLeader = new AtomicBoolean(false);
   private volatile Listener listener;
 
   public void becomeLeader()
   {
-    if (isLeader.compareAndSet(false, true) && listener != null) {
-      listener.becomeLeader();
+    if (isLeader.compareAndSet(false, true)) {
+      if (listener != null) {
+        listener.becomeLeader();
+      }
+      localTerm.incrementAndGet();
     }
   }
 
@@ -59,7 +64,7 @@ public class TestDruidLeaderSelector implements 
DruidLeaderSelector
   @Override
   public int localTerm()
   {
-    return 0;
+    return localTerm.get();
   }
 
   @Override
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java 
b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 95fa654674d..ef4901c68c3 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -93,6 +93,7 @@ import 
org.apache.druid.indexing.overlord.config.TaskQueueConfig;
 import org.apache.druid.indexing.overlord.duty.OverlordDuty;
 import org.apache.druid.indexing.overlord.duty.TaskLogAutoCleaner;
 import org.apache.druid.indexing.overlord.duty.TaskLogAutoCleanerConfig;
+import org.apache.druid.indexing.overlord.duty.UnusedSegmentsKiller;
 import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
 import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerResource;
 import org.apache.druid.indexing.overlord.http.OverlordCompactionResource;
@@ -249,6 +250,8 @@ public class CliOverlord extends ServerRunnable
             binder.bind(ShuffleClient.class).toProvider(Providers.of(null));
             binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(new 
NoopChatHandlerProvider()));
 
+            CliPeon.bindDataSegmentKiller(binder);
+
             PolyBind.createChoice(
                 binder,
                 "druid.indexer.task.rowIngestionMeters.type",
@@ -445,9 +448,9 @@ public class CliOverlord extends ServerRunnable
           private void configureOverlordHelpers(Binder binder)
           {
             JsonConfigProvider.bind(binder, "druid.indexer.logs.kill", 
TaskLogAutoCleanerConfig.class);
-            Multibinder.newSetBinder(binder, OverlordDuty.class)
-                       .addBinding()
-                       .to(TaskLogAutoCleaner.class);
+            final Multibinder<OverlordDuty> dutyBinder = 
Multibinder.newSetBinder(binder, OverlordDuty.class);
+            dutyBinder.addBinding().to(TaskLogAutoCleaner.class);
+            
dutyBinder.addBinding().to(UnusedSegmentsKiller.class).in(LazySingleton.class);
           }
         },
         new IndexingServiceInputSourceModule(),
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java 
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index e9cbab108d3..9cb939f9b89 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -468,14 +468,19 @@ public class CliPeon extends GuiceRunnable
   static void bindPeonDataSegmentHandlers(Binder binder)
   {
     // Build it to make it bind even if nothing binds to it.
-    Binders.dataSegmentKillerBinder(binder);
-    
binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class);
+    bindDataSegmentKiller(binder);
     Binders.dataSegmentMoverBinder(binder);
     
binder.bind(DataSegmentMover.class).to(OmniDataSegmentMover.class).in(LazySingleton.class);
     Binders.dataSegmentArchiverBinder(binder);
     
binder.bind(DataSegmentArchiver.class).to(OmniDataSegmentArchiver.class).in(LazySingleton.class);
   }
 
+  static void bindDataSegmentKiller(Binder binder)
+  {
+    Binders.dataSegmentKillerBinder(binder);
+    
binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class);
+  }
+
   private static void configureTaskActionClient(Binder binder)
   {
     binder.bind(TaskActionClientFactory.class)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to