This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d8e0f28f3 Refactor: Cleanup coordinator duties for metadata cleanup 
(#14631)
2d8e0f28f3 is described below

commit 2d8e0f28f36f83e091911cc7b36a57b10fd065cc
Author: Kashif Faraz <[email protected]>
AuthorDate: Sat Aug 5 13:08:23 2023 +0530

    Refactor: Cleanup coordinator duties for metadata cleanup (#14631)
    
    Changes
    - Add abstract class `MetadataCleanupDuty`
    - Make `KillAuditLogs`, `KillCompactionConfig`, etc extend 
`MetadataCleanupDuty`
    - Improve log and error messages
    - Cleanup tests
    - No functional change
---
 docs/development/modules.md                        |   4 +-
 .../test-groups/custom-coordinator-duties          |   4 +-
 .../druid/server/coordinator/DruidCoordinator.java |   1 -
 .../coordinator/DruidCoordinatorRuntimeParams.java |  20 ---
 .../server/coordinator/duty/KillAuditLog.java      |  57 ++------
 .../coordinator/duty/KillCompactionConfig.java     | 160 +++++++++------------
 .../coordinator/duty/KillDatasourceMetadata.java   |  92 ++++--------
 .../druid/server/coordinator/duty/KillRules.java   |  62 +++-----
 .../server/coordinator/duty/KillSupervisors.java   |  61 ++------
 .../duty/KillSupervisorsCustomDuty.java            |  72 +++++-----
 .../coordinator/duty/MetadataCleanupDuty.java      | 135 +++++++++++++++++
 .../druid/server/coordinator/stats/Stats.java      |  14 ++
 .../coordinator/BalanceSegmentsProfiler.java       |   1 -
 .../server/coordinator/DruidCoordinatorTest.java   |   6 +-
 .../server/coordinator/duty/KillAuditLogTest.java  |  49 ++++---
 .../coordinator/duty/KillCompactionConfigTest.java |  88 +++++-------
 .../duty/KillDatasourceMetadataTest.java           |  72 ++++++----
 .../server/coordinator/duty/KillRulesTest.java     |  47 +++---
 .../duty/KillSupervisorsCustomDutyTest.java        |  63 +++++---
 .../coordinator/duty/KillSupervisorsTest.java      |  49 ++++---
 20 files changed, 547 insertions(+), 510 deletions(-)

diff --git a/docs/development/modules.md b/docs/development/modules.md
index a0d2335194..75f4bbbe54 100644
--- a/docs/development/modules.md
+++ b/docs/development/modules.md
@@ -351,12 +351,12 @@ This config file adds the configs below to enable a 
custom coordinator duty.
 ```
 druid.coordinator.dutyGroups=["cleanupMetadata"]
 druid.coordinator.cleanupMetadata.duties=["killSupervisors"]
-druid.coordinator.cleanupMetadata.duty.killSupervisors.retainDuration=PT0M
+druid.coordinator.cleanupMetadata.duty.killSupervisors.durationToRetain=PT0M
 druid.coordinator.cleanupMetadata.period=PT10S
 ```
 
 These configurations create a custom coordinator duty group called 
`cleanupMetadata` which runs a custom coordinator duty called `killSupervisors` 
every 10 seconds.
-The custom coordinator duty `killSupervisors` also has a config called 
`retainDuration` which is set to 0 minute.
+The custom coordinator duty `killSupervisors` also has a config called 
`durationToRetain` which is set to 0 minute.
 
 ### Routing data through a HTTP proxy for your extension
 
diff --git 
a/integration-tests/docker/environment-configs/test-groups/custom-coordinator-duties
 
b/integration-tests/docker/environment-configs/test-groups/custom-coordinator-duties
index 5c75c4197d..cea6370c29 100644
--- 
a/integration-tests/docker/environment-configs/test-groups/custom-coordinator-duties
+++ 
b/integration-tests/docker/environment-configs/test-groups/custom-coordinator-duties
@@ -20,12 +20,12 @@
 # If you are making a change in load list below, make the necessary changes in 
github actions too
 
druid_extensions_loadList=["druid-kafka-indexing-service","mysql-metadata-storage","druid-s3-extensions","druid-basic-security","simple-client-sslcontext","druid-testing-tools","druid-lookups-cached-global","druid-histogram","druid-datasketches"]
 
-druid_coordinator_period_metadataStoreManagementPeriod=PT1H
+druid_coordinator_period_metadataStoreManagementPeriod=PT5S
 druid_sql_planner_authorizeSystemTablesDirectly=false
 
 #Testing kill supervisor custom coordinator duty
 druid_coordinator_kill_supervisor_on=false
 druid_coordinator_dutyGroups=["cleanupMetadata"]
 druid_coordinator_cleanupMetadata_duties=["killSupervisors"]
-druid_coordinator_cleanupMetadata_duty_killSupervisors_retainDuration=PT0M
+druid_coordinator_cleanupMetadata_duty_killSupervisors_durationToRetain=PT0M
 druid_coordinator_cleanupMetadata_period=PT10S
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index f8ea8ef24f..272de19010 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -699,7 +699,6 @@ public class DruidCoordinator
                 
.withSnapshotOfDataSourcesWithAllUsedSegments(dataSourcesSnapshot)
                 .withDynamicConfigs(getDynamicConfigs())
                 .withCompactionConfig(getCompactionConfig())
-                .withEmitter(emitter)
                 .build();
         log.info(
             "Initialized run params for group [%s] with [%,d] used segments in 
[%d] datasources.",
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
index ed1c57dbb1..79173683fa 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
@@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.client.DataSourcesSnapshot;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.metadata.MetadataRuleManager;
 import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
 import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
@@ -68,7 +67,6 @@ public class DruidCoordinatorRuntimeParams
   private final StrategicSegmentAssigner segmentAssigner;
   private final @Nullable TreeSet<DataSegment> usedSegments;
   private final @Nullable DataSourcesSnapshot dataSourcesSnapshot;
-  private final ServiceEmitter emitter;
   private final CoordinatorDynamicConfig coordinatorDynamicConfig;
   private final CoordinatorCompactionConfig coordinatorCompactionConfig;
   private final SegmentLoadingConfig segmentLoadingConfig;
@@ -83,7 +81,6 @@ public class DruidCoordinatorRuntimeParams
       StrategicSegmentAssigner segmentAssigner,
       @Nullable TreeSet<DataSegment> usedSegments,
       @Nullable DataSourcesSnapshot dataSourcesSnapshot,
-      ServiceEmitter emitter,
       CoordinatorDynamicConfig coordinatorDynamicConfig,
       CoordinatorCompactionConfig coordinatorCompactionConfig,
       SegmentLoadingConfig segmentLoadingConfig,
@@ -98,7 +95,6 @@ public class DruidCoordinatorRuntimeParams
     this.segmentAssigner = segmentAssigner;
     this.usedSegments = usedSegments;
     this.dataSourcesSnapshot = dataSourcesSnapshot;
-    this.emitter = emitter;
     this.coordinatorDynamicConfig = coordinatorDynamicConfig;
     this.coordinatorCompactionConfig = coordinatorCompactionConfig;
     this.segmentLoadingConfig = segmentLoadingConfig;
@@ -145,11 +141,6 @@ public class DruidCoordinatorRuntimeParams
     return usedSegments;
   }
 
-  public ServiceEmitter getEmitter()
-  {
-    return emitter;
-  }
-
   public CoordinatorDynamicConfig getCoordinatorDynamicConfig()
   {
     return coordinatorDynamicConfig;
@@ -200,7 +191,6 @@ public class DruidCoordinatorRuntimeParams
         segmentAssigner,
         usedSegments,
         dataSourcesSnapshot,
-        emitter,
         coordinatorDynamicConfig,
         coordinatorCompactionConfig,
         segmentLoadingConfig,
@@ -219,7 +209,6 @@ public class DruidCoordinatorRuntimeParams
     private StrategicSegmentAssigner segmentAssigner;
     private @Nullable TreeSet<DataSegment> usedSegments;
     private @Nullable DataSourcesSnapshot dataSourcesSnapshot;
-    private ServiceEmitter emitter;
     private CoordinatorDynamicConfig coordinatorDynamicConfig;
     private CoordinatorCompactionConfig coordinatorCompactionConfig;
     private SegmentLoadingConfig segmentLoadingConfig;
@@ -242,7 +231,6 @@ public class DruidCoordinatorRuntimeParams
         StrategicSegmentAssigner segmentAssigner,
         @Nullable TreeSet<DataSegment> usedSegments,
         @Nullable DataSourcesSnapshot dataSourcesSnapshot,
-        ServiceEmitter emitter,
         CoordinatorDynamicConfig coordinatorDynamicConfig,
         CoordinatorCompactionConfig coordinatorCompactionConfig,
         SegmentLoadingConfig segmentLoadingConfig,
@@ -257,7 +245,6 @@ public class DruidCoordinatorRuntimeParams
       this.segmentAssigner = segmentAssigner;
       this.usedSegments = usedSegments;
       this.dataSourcesSnapshot = dataSourcesSnapshot;
-      this.emitter = emitter;
       this.coordinatorDynamicConfig = coordinatorDynamicConfig;
       this.coordinatorCompactionConfig = coordinatorCompactionConfig;
       this.segmentLoadingConfig = segmentLoadingConfig;
@@ -279,7 +266,6 @@ public class DruidCoordinatorRuntimeParams
           segmentAssigner,
           usedSegments,
           dataSourcesSnapshot,
-          emitter,
           coordinatorDynamicConfig,
           coordinatorCompactionConfig,
           segmentLoadingConfig,
@@ -369,12 +355,6 @@ public class DruidCoordinatorRuntimeParams
       return this;
     }
 
-    public Builder withEmitter(ServiceEmitter emitter)
-    {
-      this.emitter = emitter;
-      return this;
-    }
-
     public Builder withDynamicConfigs(CoordinatorDynamicConfig configs)
     {
       this.coordinatorDynamicConfig = configs;
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
index ac735c6594..754bec1d4f 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
@@ -19,23 +19,14 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import org.apache.druid.audit.AuditManager;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
-import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.joda.time.DateTime;
 
-public class KillAuditLog implements CoordinatorDuty
+public class KillAuditLog extends MetadataCleanupDuty
 {
-  private static final Logger log = new Logger(KillAuditLog.class);
-
-  private final long period;
-  private final long retainDuration;
-  private long lastKillTime = 0;
-
   private final AuditManager auditManager;
 
   @Inject
@@ -44,44 +35,20 @@ public class KillAuditLog implements CoordinatorDuty
       DruidCoordinatorConfig config
   )
   {
-    this.period = config.getCoordinatorAuditKillPeriod().getMillis();
-    Preconditions.checkArgument(
-        this.period >= 
config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
-        "coordinator audit kill period must be >= 
druid.coordinator.period.metadataStoreManagementPeriod"
-    );
-    this.retainDuration = 
config.getCoordinatorAuditKillDurationToRetain().getMillis();
-    Preconditions.checkArgument(this.retainDuration >= 0, "coordinator audit 
kill retainDuration must be >= 0");
-    Preconditions.checkArgument(this.retainDuration < 
System.currentTimeMillis(), "Coordinator audit kill retainDuration cannot be 
greater than current time in ms");
-    log.debug(
-        "Audit Kill Task scheduling enabled with period [%s], retainDuration 
[%s]",
-        this.period,
-        this.retainDuration
+    super(
+        "audit logs",
+        "druid.coordinator.kill.audit",
+        config.getCoordinatorAuditKillPeriod(),
+        config.getCoordinatorAuditKillDurationToRetain(),
+        Stats.Kill.AUDIT_LOGS,
+        config
     );
     this.auditManager = auditManager;
   }
 
   @Override
-  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
+  protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime)
   {
-    long currentTimeMillis = System.currentTimeMillis();
-    if ((lastKillTime + period) < currentTimeMillis) {
-      lastKillTime = currentTimeMillis;
-      long timestamp = currentTimeMillis - retainDuration;
-      try {
-        int auditRemoved = auditManager.removeAuditLogsOlderThan(timestamp);
-        ServiceEmitter emitter = params.getEmitter();
-        emitter.emit(
-            new ServiceMetricEvent.Builder().build(
-                "metadata/kill/audit/count",
-                auditRemoved
-            )
-        );
-        log.info("Finished running KillAuditLog duty. Removed %,d audit logs", 
auditRemoved);
-      }
-      catch (Exception e) {
-        log.error(e, "Failed to kill audit log");
-      }
-    }
-    return params;
+    return auditManager.removeAuditLogsOlderThan(minCreatedTime.getMillis());
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java
index 74645b51b4..4b92cfa9b9 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 import org.apache.druid.audit.AuditInfo;
@@ -28,15 +27,15 @@ import org.apache.druid.common.config.JacksonConfigManager;
 import org.apache.druid.java.util.RetryableException;
 import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.metadata.MetadataStorageConnector;
 import org.apache.druid.metadata.MetadataStorageTablesConfig;
 import org.apache.druid.metadata.SqlSegmentsMetadataManager;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
-import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
 
 import java.util.Map;
 import java.util.Set;
@@ -48,16 +47,11 @@ import java.util.stream.Collectors;
  * Note that this will delete compaction configuration for inactive datasources
  * (datasource with no used and unused segments) immediately.
  */
-public class KillCompactionConfig implements CoordinatorDuty
+public class KillCompactionConfig extends MetadataCleanupDuty
 {
   private static final Logger log = new Logger(KillCompactionConfig.class);
   private static final int UPDATE_NUM_RETRY = 5;
 
-  static final String COUNT_METRIC = "metadata/kill/compaction/count";
-
-  private final long period;
-  private long lastKillTime = 0;
-
   private final JacksonConfigManager jacksonConfigManager;
   private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
   private final MetadataStorageConnector connector;
@@ -72,99 +66,85 @@ public class KillCompactionConfig implements CoordinatorDuty
       MetadataStorageTablesConfig connectorConfig
   )
   {
+    super(
+        "compaction configs",
+        "druid.coordinator.kill.compaction",
+        config.getCoordinatorCompactionKillPeriod(),
+        Duration.millis(1), // Retain duration is ignored
+        Stats.Kill.COMPACTION_CONFIGS,
+        config
+    );
     this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager;
     this.jacksonConfigManager = jacksonConfigManager;
-    this.period = config.getCoordinatorCompactionKillPeriod().getMillis();
     this.connector = connector;
     this.connectorConfig = connectorConfig;
-    Preconditions.checkArgument(
-        this.period >= 
config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
-        "Coordinator compaction configuration kill period must be >= 
druid.coordinator.period.metadataStoreManagementPeriod"
-    );
-    log.debug(
-        "Compaction Configuration Kill Task scheduling enabled with period 
[%s]",
-        this.period
-    );
   }
 
   @Override
-  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
+  protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime)
   {
-    long currentTimeMillis = System.currentTimeMillis();
-    if ((lastKillTime + period) < currentTimeMillis) {
-      lastKillTime = currentTimeMillis;
-      try {
-        RetryUtils.retry(
-            () -> {
-              final byte[] currentBytes = 
CoordinatorCompactionConfig.getConfigInByteFromDb(connector, connectorConfig);
-              final CoordinatorCompactionConfig current = 
CoordinatorCompactionConfig.convertByteToConfig(jacksonConfigManager, 
currentBytes);
-              // If current compaction config is empty then there is nothing 
to do
-              if (CoordinatorCompactionConfig.empty().equals(current)) {
-                log.info(
-                    "Finished running KillCompactionConfig duty. Nothing to do 
as compaction config is already empty.");
-                emitMetric(params.getEmitter(), 0);
-                return ConfigManager.SetResult.ok();
-              }
-
-              // Get all active datasources
-              // Note that we get all active datasources after getting 
compaction config to prevent race condition if new
-              // datasource and config are added.
-              Set<String> activeDatasources = 
sqlSegmentsMetadataManager.retrieveAllDataSourceNames();
-              final Map<String, DataSourceCompactionConfig> updated = current
-                  .getCompactionConfigs()
-                  .stream()
-                  .filter(dataSourceCompactionConfig -> 
activeDatasources.contains(dataSourceCompactionConfig.getDataSource()))
-                  
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, 
Function.identity()));
-
-              // Calculate number of compaction configs to remove for logging
-              int compactionConfigRemoved = 
current.getCompactionConfigs().size() - updated.size();
-
-              ConfigManager.SetResult result = jacksonConfigManager.set(
-                  CoordinatorCompactionConfig.CONFIG_KEY,
-                  currentBytes,
-                  CoordinatorCompactionConfig.from(current, 
ImmutableList.copyOf(updated.values())),
-                  new AuditInfo(
-                      "KillCompactionConfig",
-                      "CoordinatorDuty for automatic deletion of compaction 
config",
-                      ""
-                  )
-              );
-              if (result.isOk()) {
-                log.info(
-                    "Finished running KillCompactionConfig duty. Removed %,d 
compaction configs",
-                    compactionConfigRemoved
-                );
-                emitMetric(params.getEmitter(), compactionConfigRemoved);
-              } else if (result.isRetryable()) {
-                // Failed but is retryable
-                log.debug("Retrying KillCompactionConfig duty");
-                throw new RetryableException(result.getException());
-              } else {
-                // Failed and not retryable
-                log.error(result.getException(), "Failed to kill compaction 
configurations");
-                emitMetric(params.getEmitter(), 0);
-              }
-              return result;
-            },
-            e -> e instanceof RetryableException,
-            UPDATE_NUM_RETRY
-        );
-      }
-      catch (Exception e) {
-        log.error(e, "Failed to kill compaction configurations");
-        emitMetric(params.getEmitter(), 0);
-      }
+    try {
+      return RetryUtils.retry(
+          this::tryDeleteCompactionConfigs,
+          e -> e instanceof RetryableException,
+          UPDATE_NUM_RETRY
+      );
+    }
+    catch (Exception e) {
+      log.error(e, "Failed to kill compaction configurations");
+      return 0;
     }
-    return params;
   }
 
-  private void emitMetric(ServiceEmitter emitter, int compactionConfigRemoved)
+  /**
+   * Tries to delete compaction configs for inactive datasources and returns
+   * the number of compaction configs successfully removed.
+   */
+  private int tryDeleteCompactionConfigs() throws RetryableException
   {
-    emitter.emit(
-        new ServiceMetricEvent.Builder().build(
-            COUNT_METRIC,
-            compactionConfigRemoved
+    final byte[] currentBytes = 
CoordinatorCompactionConfig.getConfigInByteFromDb(connector, connectorConfig);
+    final CoordinatorCompactionConfig current = 
CoordinatorCompactionConfig.convertByteToConfig(
+        jacksonConfigManager,
+        currentBytes
+    );
+    // If current compaction config is empty then there is nothing to do
+    if (CoordinatorCompactionConfig.empty().equals(current)) {
+      log.info("Nothing to do as compaction config is already empty.");
+      return 0;
+    }
+
+    // Get all active datasources
+    // Note that we get all active datasources after getting compaction config 
to prevent race condition if new
+    // datasource and config are added.
+    Set<String> activeDatasources = 
sqlSegmentsMetadataManager.retrieveAllDataSourceNames();
+    final Map<String, DataSourceCompactionConfig> updated = current
+        .getCompactionConfigs()
+        .stream()
+        .filter(dataSourceCompactionConfig -> 
activeDatasources.contains(dataSourceCompactionConfig.getDataSource()))
+        .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, 
Function.identity()));
+
+    // Calculate number of compaction configs removed
+    int compactionConfigRemoved = current.getCompactionConfigs().size() - 
updated.size();
+
+    ConfigManager.SetResult result = jacksonConfigManager.set(
+        CoordinatorCompactionConfig.CONFIG_KEY,
+        currentBytes,
+        CoordinatorCompactionConfig.from(current, 
ImmutableList.copyOf(updated.values())),
+        new AuditInfo(
+            "KillCompactionConfig",
+            "CoordinatorDuty for automatic deletion of compaction config",
+            ""
         )
     );
+
+    if (result.isOk()) {
+      return compactionConfigRemoved;
+    } else if (result.isRetryable()) {
+      log.debug("Retrying KillCompactionConfig duty");
+      throw new RetryableException(result.getException());
+    } else {
+      log.error(result.getException(), "Failed to kill compaction 
configurations");
+      return 0;
+    }
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java
index 9c9535b250..7fbf1b1deb 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java
@@ -19,17 +19,14 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.inject.Inject;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.metadata.MetadataSupervisorManager;
 import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
-import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.joda.time.DateTime;
 
 import java.util.Collection;
 import java.util.Map;
@@ -42,14 +39,8 @@ import java.util.stream.Collectors;
  * Note that this class relies on the supervisorSpec.getDataSources names to 
match with the
  * 'datasource' column of the datasource metadata table.
  */
-public class KillDatasourceMetadata implements CoordinatorDuty
+public class KillDatasourceMetadata extends MetadataCleanupDuty
 {
-  private static final Logger log = new Logger(KillDatasourceMetadata.class);
-
-  private final long period;
-  private final long retainDuration;
-  private long lastKillTime = 0;
-
   private final IndexerMetadataStorageCoordinator 
indexerMetadataStorageCoordinator;
   private final MetadataSupervisorManager metadataSupervisorManager;
 
@@ -60,62 +51,37 @@ public class KillDatasourceMetadata implements 
CoordinatorDuty
       MetadataSupervisorManager metadataSupervisorManager
   )
   {
+    super(
+        "datasources",
+        "druid.coordinator.kill.datasource",
+        config.getCoordinatorDatasourceKillPeriod(),
+        config.getCoordinatorDatasourceKillDurationToRetain(),
+        Stats.Kill.DATASOURCES,
+        config
+    );
     this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
     this.metadataSupervisorManager = metadataSupervisorManager;
-    this.period = config.getCoordinatorDatasourceKillPeriod().getMillis();
-    Preconditions.checkArgument(
-        this.period >= 
config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
-        "Coordinator datasource metadata kill period must be >= 
druid.coordinator.period.metadataStoreManagementPeriod"
-    );
-    this.retainDuration = 
config.getCoordinatorDatasourceKillDurationToRetain().getMillis();
-    Preconditions.checkArgument(this.retainDuration >= 0, "Coordinator 
datasource metadata kill retainDuration must be >= 0");
-    Preconditions.checkArgument(this.retainDuration < 
System.currentTimeMillis(), "Coordinator datasource metadata kill 
retainDuration cannot be greater than current time in ms");
-    log.debug(
-        "Datasource Metadata Kill Task scheduling enabled with period [%s], 
retainDuration [%s]",
-        this.period,
-        this.retainDuration
-    );
   }
 
   @Override
-  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
+  protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime)
   {
-    long currentTimeMillis = System.currentTimeMillis();
-    if ((lastKillTime + period) < currentTimeMillis) {
-      lastKillTime = currentTimeMillis;
-      long timestamp = currentTimeMillis - retainDuration;
-      try {
-        // Datasource metadata only exists for datasource with supervisor
-        // To determine if datasource metadata is still active, we check if 
the supervisor for that particular datasource
-        // is still active or not
-        Map<String, SupervisorSpec> allActiveSupervisor = 
metadataSupervisorManager.getLatestActiveOnly();
-        Set<String> allDatasourceWithActiveSupervisor = 
allActiveSupervisor.values()
-                                                                           
.stream()
-                                                                           
.map(supervisorSpec -> supervisorSpec.getDataSources())
-                                                                           
.flatMap(Collection::stream)
-                                                                           
.filter(datasource -> !Strings.isNullOrEmpty(datasource))
-                                                                           
.collect(Collectors.toSet());
-        // We exclude removing datasource metadata with active supervisor
-        int datasourceMetadataRemovedCount = 
indexerMetadataStorageCoordinator.removeDataSourceMetadataOlderThan(
-            timestamp,
-            allDatasourceWithActiveSupervisor
-        );
-        ServiceEmitter emitter = params.getEmitter();
-        emitter.emit(
-            new ServiceMetricEvent.Builder().build(
-                "metadata/kill/datasource/count",
-                datasourceMetadataRemovedCount
-            )
-        );
-        log.info(
-            "Finished running KillDatasourceMetadata duty. Removed %,d 
datasource metadata",
-            datasourceMetadataRemovedCount
-        );
-      }
-      catch (Exception e) {
-        log.error(e, "Failed to kill datasource metadata");
-      }
-    }
-    return params;
+    // Datasource metadata only exists for datasource with supervisor
+    // To determine if datasource metadata is still active, we check if the 
supervisor for that particular datasource
+    // is still active or not
+    Map<String, SupervisorSpec> allActiveSupervisor = 
metadataSupervisorManager.getLatestActiveOnly();
+    Set<String> allDatasourceWithActiveSupervisor
+        = allActiveSupervisor.values()
+                             .stream()
+                             .map(SupervisorSpec::getDataSources)
+                             .flatMap(Collection::stream)
+                             .filter(datasource -> 
!Strings.isNullOrEmpty(datasource))
+                             .collect(Collectors.toSet());
+
+    // We exclude removing datasource metadata with active supervisor
+    return indexerMetadataStorageCoordinator.removeDataSourceMetadataOlderThan(
+        minCreatedTime.getMillis(),
+        allDatasourceWithActiveSupervisor
+    );
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java
index 50b1740e86..40964959b3 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java
@@ -19,64 +19,36 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.MetadataRuleManager;
 import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
-import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.joda.time.DateTime;
 
-public class KillRules implements CoordinatorDuty
+public class KillRules extends MetadataCleanupDuty
 {
-  private static final Logger log = new Logger(KillRules.class);
-
-  private final long period;
-  private final long retainDuration;
-  private long lastKillTime = 0;
+  private final MetadataRuleManager metadataRuleManager;
 
   @Inject
   public KillRules(
-      DruidCoordinatorConfig config
+      DruidCoordinatorConfig config,
+      MetadataRuleManager metadataRuleManager
   )
   {
-    this.period = config.getCoordinatorRuleKillPeriod().getMillis();
-    Preconditions.checkArgument(
-        this.period >= 
config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
-        "coordinator rule kill period must be >= 
druid.coordinator.period.metadataStoreManagementPeriod"
-    );
-    this.retainDuration = 
config.getCoordinatorRuleKillDurationToRetain().getMillis();
-    Preconditions.checkArgument(this.retainDuration >= 0, "coordinator rule 
kill retainDuration must be >= 0");
-    Preconditions.checkArgument(this.retainDuration < 
System.currentTimeMillis(), "Coordinator rule kill retainDuration cannot be 
greater than current time in ms");
-    log.debug(
-        "Rule Kill Task scheduling enabled with period [%s], retainDuration 
[%s]",
-        this.period,
-        this.retainDuration
+    super(
+        "rules",
+        "druid.coordinator.kill.rule",
+        config.getCoordinatorRuleKillPeriod(),
+        config.getCoordinatorRuleKillDurationToRetain(),
+        Stats.Kill.RULES,
+        config
     );
+    this.metadataRuleManager = metadataRuleManager;
   }
 
   @Override
-  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
+  protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime)
   {
-    long currentTimeMillis = System.currentTimeMillis();
-    if ((lastKillTime + period) < currentTimeMillis) {
-      lastKillTime = currentTimeMillis;
-      long timestamp = currentTimeMillis - retainDuration;
-      try {
-        int ruleRemoved = 
params.getDatabaseRuleManager().removeRulesForEmptyDatasourcesOlderThan(timestamp);
-        ServiceEmitter emitter = params.getEmitter();
-        emitter.emit(
-            new ServiceMetricEvent.Builder().build(
-                "metadata/kill/rule/count",
-                ruleRemoved
-            )
-        );
-        log.info("Finished running KillRules duty. Removed %,d rule", 
ruleRemoved);
-      }
-      catch (Exception e) {
-        log.error(e, "Failed to kill rules metadata");
-      }
-    }
-    return params;
+    return 
metadataRuleManager.removeRulesForEmptyDatasourcesOlderThan(minCreatedTime.getMillis());
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java
index 3f87f5dee8..7f6c43d5ee 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java
@@ -19,26 +19,17 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.metadata.MetadataSupervisorManager;
 import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
-import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.joda.time.DateTime;
 
 /**
- * CoordinatorDuty for automatic deletion of terminated supervisors from the 
supervisor table in metadata storage.
+ * Cleans up terminated supervisors from the supervisors table in metadata 
storage.
  */
-public class KillSupervisors implements CoordinatorDuty
+public class KillSupervisors extends MetadataCleanupDuty
 {
-  private static final Logger log = new Logger(KillSupervisors.class);
-
-  private final long period;
-  private final long retainDuration;
-  private long lastKillTime = 0;
-
   private final MetadataSupervisorManager metadataSupervisorManager;
 
   @Inject
@@ -47,44 +38,20 @@ public class KillSupervisors implements CoordinatorDuty
       MetadataSupervisorManager metadataSupervisorManager
   )
   {
-    this.metadataSupervisorManager = metadataSupervisorManager;
-    this.period = config.getCoordinatorSupervisorKillPeriod().getMillis();
-    Preconditions.checkArgument(
-        this.period >= 
config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
-        "Coordinator supervisor kill period must be >= 
druid.coordinator.period.metadataStoreManagementPeriod"
-    );
-    this.retainDuration = 
config.getCoordinatorSupervisorKillDurationToRetain().getMillis();
-    Preconditions.checkArgument(this.retainDuration >= 0, "Coordinator 
supervisor kill retainDuration must be >= 0");
-    Preconditions.checkArgument(this.retainDuration < 
System.currentTimeMillis(), "Coordinator supervisor kill retainDuration cannot 
be greater than current time in ms");
-    log.debug(
-        "Supervisor Kill Task scheduling enabled with period [%s], 
retainDuration [%s]",
-        this.period,
-        this.retainDuration
+    super(
+        "supervisors",
+        "druid.coordinator.kill.supervisor",
+        config.getCoordinatorSupervisorKillPeriod(),
+        config.getCoordinatorSupervisorKillDurationToRetain(),
+        Stats.Kill.SUPERVISOR_SPECS,
+        config
     );
+    this.metadataSupervisorManager = metadataSupervisorManager;
   }
 
   @Override
-  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
+  protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime)
   {
-    long currentTimeMillis = System.currentTimeMillis();
-    if ((lastKillTime + period) < currentTimeMillis) {
-      lastKillTime = currentTimeMillis;
-      long timestamp = currentTimeMillis - retainDuration;
-      try {
-        int supervisorRemoved = 
metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(timestamp);
-        ServiceEmitter emitter = params.getEmitter();
-        emitter.emit(
-            new ServiceMetricEvent.Builder().build(
-                "metadata/kill/supervisor/count",
-                supervisorRemoved
-            )
-        );
-        log.info("Finished running KillSupervisors duty. Removed %,d 
supervisor specs", supervisorRemoved);
-      }
-      catch (Exception e) {
-        log.error(e, "Failed to kill terminated supervisor metadata");
-      }
-    }
-    return params;
+    return 
metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(minCreatedTime.getMillis());
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDuty.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDuty.java
index 7a5b1d6970..7d645967be 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDuty.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDuty.java
@@ -22,64 +22,58 @@ package org.apache.druid.server.coordinator.duty;
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
+import org.apache.druid.guice.annotations.UnstableApi;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.metadata.MetadataSupervisorManager;
-import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.joda.time.DateTime;
 import org.joda.time.Duration;
 
 /**
- * CoordinatorDuty for automatic deletion of terminated supervisors from the 
supervisor table in metadata storage.
- * This class has the same purpose as {@link KillSupervisors} but uses a 
different configuration style as
- * detailed in {@link CoordinatorCustomDuty}. This class primary purpose is as 
an example to demostrate the usuage
- * of the {@link CoordinatorCustomDuty} {@link 
org.apache.druid.guice.annotations.ExtensionPoint}
- *
- * Production use case should still use {@link KillSupervisors}. In the 
future, we might migrate all metadata
- * management coordinator duties to {@link CoordinatorCustomDuty} but until 
then this class will remains undocumented
- * and should not be use in production.
+ * Example {@link CoordinatorCustomDuty} for automatic deletion of terminated
+ * supervisors from the metadata storage. This duty has the same implementation
+ * as {@link KillSupervisors} but uses a different configuration style as
+ * detailed in {@link CoordinatorCustomDuty}.
+ * <p>
+ * This duty is only an example to demostrate the usage of coordinator custom
+ * duties. All production clusters should continue using {@link 
KillSupervisors}.
+ * <p>
+ * In the future, we might migrate all metadata management coordinator duties 
to
+ * {@link CoordinatorCustomDuty} but until then this class will remain 
undocumented.
  */
-public class KillSupervisorsCustomDuty implements CoordinatorCustomDuty
+@UnstableApi
+public class KillSupervisorsCustomDuty extends MetadataCleanupDuty implements 
CoordinatorCustomDuty
 {
   private static final Logger log = new 
Logger(KillSupervisorsCustomDuty.class);
 
-  private final Duration retainDuration;
   private final MetadataSupervisorManager metadataSupervisorManager;
 
   @JsonCreator
   public KillSupervisorsCustomDuty(
-      @JsonProperty("retainDuration") Duration retainDuration,
-      @JacksonInject MetadataSupervisorManager metadataSupervisorManager
+      @JsonProperty("durationToRetain") Duration retainDuration,
+      @JacksonInject MetadataSupervisorManager metadataSupervisorManager,
+      @JacksonInject DruidCoordinatorConfig coordinatorConfig
   )
   {
-    this.metadataSupervisorManager = metadataSupervisorManager;
-    this.retainDuration = retainDuration;
-    Preconditions.checkArgument(this.retainDuration != null && 
this.retainDuration.getMillis() >= 0, "(Custom Duty) Coordinator supervisor 
kill retainDuration must be >= 0");
-    log.info(
-        "Supervisor Kill Task scheduling enabled with retainDuration [%s]",
-        this.retainDuration
+    super(
+        "supervisors",
+        "KillSupervisorsCustomDuty",
+        // Use the same period as metadata store management so that validation 
passes
+        // Actual period of custom duties is configured by the user
+        coordinatorConfig.getCoordinatorMetadataStoreManagementPeriod(),
+        retainDuration,
+        Stats.Kill.SUPERVISOR_SPECS,
+        coordinatorConfig
     );
+    this.metadataSupervisorManager = metadataSupervisorManager;
+    log.warn("This is only an example implementation of a custom duty and"
+             + " must not be used in production. Use KillSupervisors duty 
instead.");
   }
 
   @Override
-  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
+  protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime)
   {
-    long timestamp = System.currentTimeMillis() - retainDuration.getMillis();
-    try {
-      int supervisorRemoved = 
metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(timestamp);
-      ServiceEmitter emitter = params.getEmitter();
-      emitter.emit(
-          new ServiceMetricEvent.Builder().build(
-              "metadata/kill/supervisor/count",
-              supervisorRemoved
-          )
-      );
-      log.info("Finished running KillSupervisors duty. Removed %,d supervisor 
specs", supervisorRemoved);
-    }
-    catch (Exception e) {
-      log.error(e, "Failed to kill terminated supervisor metadata");
-    }
-    return params;
+    return 
metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(minCreatedTime.getMillis());
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/MetadataCleanupDuty.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/MetadataCleanupDuty.java
new file mode 100644
index 0000000000..3206f11003
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/MetadataCleanupDuty.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.CoordinatorStat;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
+
+/**
+ * Performs cleanup of stale metadata entries created before a configured 
retain duration.
+ * <p>
+ * In every invocation of {@link #run}, the duty checks if the {@link 
#cleanupPeriod}
+ * has elapsed since the {@link #lastCleanupTime}. If it has, then the method
+ * {@link #cleanupEntriesCreatedBefore(DateTime)} is invoked. Otherwise, the 
duty
+ * completes immediately without making any changes.
+ */
+public abstract class MetadataCleanupDuty implements CoordinatorDuty
+{
+  private static final Logger log = new Logger(MetadataCleanupDuty.class);
+
+  private final String propertyPrefix;
+  private final String entryType;
+  private final CoordinatorStat cleanupCountStat;
+
+  private final Duration cleanupPeriod;
+  private final Duration retainDuration;
+
+  private DateTime lastCleanupTime = DateTimes.utc(0);
+
+  protected MetadataCleanupDuty(
+      String entryType,
+      String propertyPrefix,
+      Duration cleanupPeriod,
+      Duration retainDuration,
+      CoordinatorStat cleanupCountStat,
+      DruidCoordinatorConfig coordinatorConfig
+  )
+  {
+    this.propertyPrefix = propertyPrefix;
+    this.entryType = entryType;
+    this.cleanupPeriod = cleanupPeriod;
+    this.retainDuration = retainDuration;
+    this.cleanupCountStat = cleanupCountStat;
+
+    validatePeriod(cleanupPeriod, 
coordinatorConfig.getCoordinatorMetadataStoreManagementPeriod());
+    validateRetainDuration(retainDuration);
+
+    log.debug(
+        "Enabled cleanup of [%s] with period [%s] and durationToRetain [%s].",
+        entryType, cleanupPeriod, retainDuration
+    );
+  }
+
+  @Nullable
+  @Override
+  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
+  {
+    final DateTime now = DateTimes.nowUtc();
+
+    // Perform cleanup only if cleanup period has elapsed
+    if (lastCleanupTime.plus(cleanupPeriod).isBefore(now)) {
+      lastCleanupTime = now;
+
+      try {
+        DateTime minCreatedTime = now.minus(retainDuration);
+        int deletedEntries = cleanupEntriesCreatedBefore(minCreatedTime);
+        log.info("Removed [%,d] [%s] created before [%s].", deletedEntries, 
entryType, minCreatedTime);
+
+        params.getCoordinatorStats().add(cleanupCountStat, deletedEntries);
+      }
+      catch (Exception e) {
+        log.error(e, "Failed to perform cleanup of [%s]", entryType);
+      }
+    }
+
+    return params;
+  }
+
+  /**
+   * Cleans up metadata entries created before the {@code minCreatedTime}.
+   * <p>
+   * This method is not invoked if the {@link #cleanupPeriod} has not elapsed
+   * since the {@link #lastCleanupTime}.
+   *
+   * @return Number of deleted metadata entries
+   */
+  protected abstract int cleanupEntriesCreatedBefore(DateTime minCreatedTime);
+
+  private void validatePeriod(Duration period, Duration 
metadataManagementPeriod)
+  {
+    Preconditions.checkArgument(
+        period != null && period.getMillis() >= 
metadataManagementPeriod.getMillis(),
+        "[%s.period] must be greater than 
[druid.coordinator.period.metadataStoreManagementPeriod]",
+        propertyPrefix
+    );
+  }
+
+  private void validateRetainDuration(Duration retainDuration)
+  {
+    Preconditions.checkArgument(
+        retainDuration != null && retainDuration.getMillis() >= 0,
+        "[%s.durationToRetain] must be 0 milliseconds or higher",
+        propertyPrefix
+    );
+    Preconditions.checkArgument(
+        retainDuration.getMillis() < System.currentTimeMillis(),
+        "[%s.durationToRetain] cannot be greater than current time in 
milliseconds",
+        propertyPrefix
+    );
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
index 28f5c91049..ac37767327 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
@@ -129,6 +129,20 @@ public class Stats
         = CoordinatorStat.toDebugAndEmit("groupRunTime", 
"coordinator/global/time");
   }
 
+  public static class Kill
+  {
+    public static final CoordinatorStat COMPACTION_CONFIGS
+        = CoordinatorStat.toDebugAndEmit("killedCompactConfigs", 
"metadata/kill/compaction/count");
+    public static final CoordinatorStat SUPERVISOR_SPECS
+        = CoordinatorStat.toDebugAndEmit("killedSupervisorSpecs", 
"metadata/kill/supervisor/count");
+    public static final CoordinatorStat RULES
+        = CoordinatorStat.toDebugAndEmit("killedRules", 
"metadata/kill/rule/count");
+    public static final CoordinatorStat AUDIT_LOGS
+        = CoordinatorStat.toDebugAndEmit("killedAuditLogs", 
"metadata/kill/audit/count");
+    public static final CoordinatorStat DATASOURCES
+        = CoordinatorStat.toDebugAndEmit("killedDatasources", 
"metadata/kill/datasource/count");
+  }
+
   public static class Balancer
   {
     public static final CoordinatorStat COMPUTATION_ERRORS = 
CoordinatorStat.toLogAndEmit(
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
index 5f1e9dd7e8..d1df912698 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
@@ -139,7 +139,6 @@ public class BalanceSegmentsProfiler
                 .build()
         )
         .withSegmentAssignerUsing(loadQueueManager)
-        .withEmitter(emitter)
         .withDatabaseRuleManager(manager)
         .build();
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 15fc5f5ac9..979181619a 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -690,7 +690,11 @@ public class DruidCoordinatorTest extends CuratorTestBase
   @Test
   public void 
testInitializeCompactSegmentsDutyWhenCustomDutyGroupDoesNotContainsCompactSegments()
   {
-    CoordinatorCustomDutyGroup group = new 
CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1), 
ImmutableList.of(new KillSupervisorsCustomDuty(new Duration("PT1S"), null)));
+    CoordinatorCustomDutyGroup group = new CoordinatorCustomDutyGroup(
+        "group1",
+        Duration.standardSeconds(1),
+        ImmutableList.of(new KillSupervisorsCustomDuty(new Duration("PT1S"), 
null, druidCoordinatorConfig))
+    );
     CoordinatorCustomDutyGroups customDutyGroups = new 
CoordinatorCustomDutyGroups(ImmutableSet.of(group));
     coordinator = new DruidCoordinator(
         druidCoordinatorConfig,
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
index 8480670160..bec2e3ed4b 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
@@ -20,14 +20,14 @@
 package org.apache.druid.server.coordinator.duty;
 
 import org.apache.druid.audit.AuditManager;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
 import org.joda.time.Duration;
-import org.junit.Rule;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
@@ -43,13 +43,15 @@ public class KillAuditLogTest
   @Mock
   private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
 
-  @Mock
-  private ServiceEmitter mockServiceEmitter;
-
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
   private KillAuditLog killAuditLog;
+  private CoordinatorRunStats runStats;
+
+  @Before
+  public void setup()
+  {
+    runStats = new CoordinatorRunStats();
+    
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats);
+  }
 
   @Test
   public void testRunSkipIfLastRunLessThanPeriod()
@@ -69,7 +71,6 @@ public class KillAuditLogTest
   @Test
   public void testRunNotSkipIfLastRunMoreThanPeriod()
   {
-    
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
     TestDruidCoordinatorConfig druidCoordinatorConfig = new 
TestDruidCoordinatorConfig.Builder()
         .withMetadataStoreManagementPeriod(new Duration("PT5s"))
         .withCoordianatorAuditKillPeriod(new Duration("PT6S"))
@@ -80,7 +81,7 @@ public class KillAuditLogTest
     killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
     killAuditLog.run(mockDruidCoordinatorRuntimeParams);
     
Mockito.verify(mockAuditManager).removeAuditLogsOlderThan(ArgumentMatchers.anyLong());
-    
Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+    Assert.assertTrue(runStats.hasStat(Stats.Kill.AUDIT_LOGS));
   }
 
   @Test
@@ -93,9 +94,16 @@ public class KillAuditLogTest
         .withCoordinatorKillMaxSegments(10)
         .withCoordinatorKillIgnoreDurationToRetain(false)
         .build();
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("coordinator audit kill period must be >= 
druid.coordinator.period.metadataStoreManagementPeriod");
-    killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
+
+    final IllegalArgumentException exception = Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> killAuditLog = new KillAuditLog(mockAuditManager, 
druidCoordinatorConfig)
+    );
+    Assert.assertEquals(
+        "[druid.coordinator.kill.audit.period] must be greater than"
+        + " [druid.coordinator.period.metadataStoreManagementPeriod]",
+        exception.getMessage()
+    );
   }
 
   @Test
@@ -108,8 +116,13 @@ public class KillAuditLogTest
         .withCoordinatorKillMaxSegments(10)
         .withCoordinatorKillIgnoreDurationToRetain(false)
         .build();
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("coordinator audit kill retainDuration must be >= 
0");
-    killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
+    final IllegalArgumentException exception = Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> killAuditLog = new KillAuditLog(mockAuditManager, 
druidCoordinatorConfig)
+    );
+    Assert.assertEquals(
+        "[druid.coordinator.kill.audit.durationToRetain] must be 0 
milliseconds or higher",
+        exception.getMessage()
+    );
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
index 916f3ed5e6..6d201aa9d6 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
@@ -25,8 +25,6 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.druid.common.config.ConfigManager;
 import org.apache.druid.common.config.JacksonConfigManager;
 import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
 import org.apache.druid.metadata.MetadataStorageConnector;
 import org.apache.druid.metadata.MetadataStorageTablesConfig;
 import org.apache.druid.metadata.SqlSegmentsMetadataManager;
@@ -35,21 +33,19 @@ import 
org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
 import org.joda.time.Duration;
 import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
 
 @RunWith(MockitoJUnitRunner.class)
 public class KillCompactionConfigTest
@@ -57,9 +53,6 @@ public class KillCompactionConfigTest
   @Mock
   private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
 
-  @Mock
-  private ServiceEmitter mockServiceEmitter;
-
   @Mock
   private SqlSegmentsMetadataManager mockSqlSegmentsMetadataManager;
 
@@ -72,15 +65,15 @@ public class KillCompactionConfigTest
   @Mock
   private MetadataStorageTablesConfig mockConnectorConfig;
 
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
   private KillCompactionConfig killCompactionConfig;
+  private CoordinatorRunStats runStats;
 
   @Before
   public void setup()
   {
+    runStats = new CoordinatorRunStats();
     
Mockito.when(mockConnectorConfig.getConfigTable()).thenReturn("druid_config");
+    
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats);
   }
 
   @Test
@@ -102,7 +95,7 @@ public class KillCompactionConfigTest
     killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
     Mockito.verifyNoInteractions(mockSqlSegmentsMetadataManager);
     Mockito.verifyNoInteractions(mockJacksonConfigManager);
-    Mockito.verifyNoInteractions(mockServiceEmitter);
+    Assert.assertEquals(0, runStats.rowCount());
   }
 
   @Test
@@ -114,14 +107,21 @@ public class KillCompactionConfigTest
         .withCoordinatorKillMaxSegments(10)
         .withCoordinatorKillIgnoreDurationToRetain(false)
         .build();
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("Coordinator compaction configuration kill period 
must be >= druid.coordinator.period.metadataStoreManagementPeriod");
-    killCompactionConfig = new KillCompactionConfig(
-        druidCoordinatorConfig,
-        mockSqlSegmentsMetadataManager,
-        mockJacksonConfigManager,
-        mockConnector,
-        mockConnectorConfig
+
+    final IllegalArgumentException exception = Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> killCompactionConfig = new KillCompactionConfig(
+            druidCoordinatorConfig,
+            mockSqlSegmentsMetadataManager,
+            mockJacksonConfigManager,
+            mockConnector,
+            mockConnectorConfig
+        )
+    );
+    Assert.assertEquals(
+        "[druid.coordinator.kill.compaction.period] must be greater than"
+        + " [druid.coordinator.period.metadataStoreManagementPeriod]",
+        exception.getMessage()
     );
   }
 
@@ -129,7 +129,6 @@ public class KillCompactionConfigTest
   @Test
   public void testRunDoNothingIfCurrentConfigIsEmpty()
   {
-    
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
     // Set current compaction config to an empty compaction config
     Mockito.when(mockConnector.lookup(
         ArgumentMatchers.anyString(),
@@ -158,10 +157,9 @@ public class KillCompactionConfigTest
     );
     killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
     Mockito.verifyNoInteractions(mockSqlSegmentsMetadataManager);
-    final ArgumentCaptor<ServiceEventBuilder> emittedEventCaptor = 
ArgumentCaptor.forClass(ServiceEventBuilder.class);
-    Mockito.verify(mockServiceEmitter).emit(emittedEventCaptor.capture());
-    Assert.assertEquals(KillCompactionConfig.COUNT_METRIC, 
emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("metric"));
-    Assert.assertEquals(0, 
emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value"));
+    Assert.assertTrue(runStats.hasStat(Stats.Kill.COMPACTION_CONFIGS));
+    Assert.assertEquals(0, runStats.get(Stats.Kill.COMPACTION_CONFIGS));
+
     Mockito.verify(mockJacksonConfigManager).convertByteToConfig(
         ArgumentMatchers.eq(null),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
@@ -223,7 +221,6 @@ public class KillCompactionConfigTest
         ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
     ).thenReturn(originalCurrentConfig);
-    
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
     
Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of(activeDatasourceName));
     final ArgumentCaptor<byte[]> oldConfigCaptor = 
ArgumentCaptor.forClass(byte[].class);
     final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = 
ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
@@ -257,11 +254,7 @@ public class KillCompactionConfigTest
     Assert.assertEquals(1, 
newConfigCaptor.getValue().getCompactionConfigs().size());
 
     Assert.assertEquals(activeDatasourceConfig, 
newConfigCaptor.getValue().getCompactionConfigs().get(0));
-    final ArgumentCaptor<ServiceEventBuilder> emittedEventCaptor = 
ArgumentCaptor.forClass(ServiceEventBuilder.class);
-    Mockito.verify(mockServiceEmitter).emit(emittedEventCaptor.capture());
-    Assert.assertEquals(KillCompactionConfig.COUNT_METRIC, 
emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("metric"));
-    // Should delete 1 config
-    Assert.assertEquals(1, 
emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value"));
+    Assert.assertEquals(1, runStats.get(Stats.Kill.COMPACTION_CONFIGS));
 
     Mockito.verify(mockJacksonConfigManager).convertByteToConfig(
         ArgumentMatchers.eq(originalCurrentConfigBytes),
@@ -317,27 +310,20 @@ public class KillCompactionConfigTest
         ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
         ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
     ).thenReturn(originalCurrentConfig);
-    
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
     
Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of());
     Mockito.when(mockJacksonConfigManager.set(
         ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
         ArgumentMatchers.any(byte[].class),
         ArgumentMatchers.any(CoordinatorCompactionConfig.class),
         ArgumentMatchers.any())
-    ).thenAnswer(new Answer() {
-      private int count = 0;
-      @Override
-      public Object answer(InvocationOnMock invocation)
-      {
-        if (count++ < 3) {
-          // Return fail result with RetryableException the first three call 
to updated set
-          return ConfigManager.SetResult.fail(new Exception(), true);
-        } else {
-          // Return success ok on the fourth call to set updated config
-          return ConfigManager.SetResult.ok();
-        }
-      }
-    });
+    ).thenReturn(
+        // Return fail result with RetryableException the first three calls to 
updated set
+        ConfigManager.SetResult.fail(new Exception(), true),
+        ConfigManager.SetResult.fail(new Exception(), true),
+        ConfigManager.SetResult.fail(new Exception(), true),
+        // Return success ok on the fourth call to set updated config
+        ConfigManager.SetResult.ok()
+    );
 
     TestDruidCoordinatorConfig druidCoordinatorConfig = new 
TestDruidCoordinatorConfig.Builder()
         .withMetadataStoreManagementPeriod(new Duration("PT5S"))
@@ -354,12 +340,8 @@ public class KillCompactionConfigTest
     );
     killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
 
-    // Verify and Assert
-    final ArgumentCaptor<ServiceEventBuilder> emittedEventCaptor = 
ArgumentCaptor.forClass(ServiceEventBuilder.class);
-    Mockito.verify(mockServiceEmitter).emit(emittedEventCaptor.capture());
-    Assert.assertEquals(KillCompactionConfig.COUNT_METRIC, 
emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("metric"));
-    // Should delete 1 config
-    Assert.assertEquals(1, 
emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value"));
+    // Verify that 1 config has been deleted
+    Assert.assertEquals(1, runStats.get(Stats.Kill.COMPACTION_CONFIGS));
 
     // Should call convertByteToConfig and lookup (to refresh current 
compaction config) four times due to RetryableException when failed
     Mockito.verify(mockJacksonConfigManager, 
Mockito.times(4)).convertByteToConfig(
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java
index 9e49603315..222d5a715f 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java
@@ -21,16 +21,15 @@ package org.apache.druid.server.coordinator.duty;
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
 import org.apache.druid.metadata.MetadataSupervisorManager;
-import org.apache.druid.metadata.TestSupervisorSpec;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
 import org.joda.time.Duration;
-import org.junit.Rule;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
@@ -49,16 +48,15 @@ public class KillDatasourceMetadataTest
   @Mock
   private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
 
-  @Mock
-  private TestSupervisorSpec mockKinesisSupervisorSpec;
-
-  @Mock
-  private ServiceEmitter mockServiceEmitter;
-
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
   private KillDatasourceMetadata killDatasourceMetadata;
+  private CoordinatorRunStats runStats;
+
+  @Before
+  public void setup()
+  {
+    runStats = new CoordinatorRunStats();
+    
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats);
+  }
 
   @Test
   public void testRunSkipIfLastRunLessThanPeriod()
@@ -79,7 +77,7 @@ public class KillDatasourceMetadataTest
   @Test
   public void testRunNotSkipIfLastRunMoreThanPeriod()
   {
-    
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
+    
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats);
 
     TestDruidCoordinatorConfig druidCoordinatorConfig = new 
TestDruidCoordinatorConfig.Builder()
         .withMetadataStoreManagementPeriod(new Duration("PT5S"))
@@ -88,10 +86,14 @@ public class KillDatasourceMetadataTest
         .withCoordinatorKillMaxSegments(10)
         .withCoordinatorKillIgnoreDurationToRetain(false)
         .build();
-    killDatasourceMetadata = new 
KillDatasourceMetadata(druidCoordinatorConfig, 
mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
+    killDatasourceMetadata = new KillDatasourceMetadata(
+        druidCoordinatorConfig,
+        mockIndexerMetadataStorageCoordinator,
+        mockMetadataSupervisorManager
+    );
     killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams);
     
Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(),
 ArgumentMatchers.anySet());
-    
Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+    Assert.assertTrue(runStats.hasStat(Stats.Kill.DATASOURCES));
   }
 
   @Test
@@ -104,9 +106,20 @@ public class KillDatasourceMetadataTest
         .withCoordinatorKillMaxSegments(10)
         .withCoordinatorKillIgnoreDurationToRetain(false)
         .build();
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("Coordinator datasource metadata kill period must 
be >= druid.coordinator.period.metadataStoreManagementPeriod");
-    killDatasourceMetadata = new 
KillDatasourceMetadata(druidCoordinatorConfig, 
mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
+
+    final IllegalArgumentException exception = Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> killDatasourceMetadata = new KillDatasourceMetadata(
+            druidCoordinatorConfig,
+            mockIndexerMetadataStorageCoordinator,
+            mockMetadataSupervisorManager
+        )
+    );
+    Assert.assertEquals(
+        "[druid.coordinator.kill.datasource.period] must be greater than"
+        + " [druid.coordinator.period.metadataStoreManagementPeriod]",
+        exception.getMessage()
+    );
   }
 
   @Test
@@ -119,16 +132,23 @@ public class KillDatasourceMetadataTest
         .withCoordinatorKillMaxSegments(10)
         .withCoordinatorKillIgnoreDurationToRetain(false)
         .build();
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("Coordinator datasource metadata kill 
retainDuration must be >= 0");
-    killDatasourceMetadata = new 
KillDatasourceMetadata(druidCoordinatorConfig, 
mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
+    final IllegalArgumentException exception = Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> killDatasourceMetadata = new KillDatasourceMetadata(
+            druidCoordinatorConfig,
+            mockIndexerMetadataStorageCoordinator,
+            mockMetadataSupervisorManager
+        )
+    );
+    Assert.assertEquals(
+        "[druid.coordinator.kill.datasource.durationToRetain] must be 0 
milliseconds or higher",
+        exception.getMessage()
+    );
   }
 
   @Test
   public void testRunWithEmptyFilterExcludedDatasource()
   {
-    
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
-
     TestDruidCoordinatorConfig druidCoordinatorConfig = new 
TestDruidCoordinatorConfig.Builder()
         .withMetadataStoreManagementPeriod(new Duration("PT5S"))
         .withCoordinatorDatasourceKillPeriod(new Duration("PT6S"))
@@ -139,6 +159,6 @@ public class KillDatasourceMetadataTest
     killDatasourceMetadata = new 
KillDatasourceMetadata(druidCoordinatorConfig, 
mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
     killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams);
     
Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(),
 ArgumentMatchers.eq(ImmutableSet.of()));
-    
Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+    Assert.assertTrue(runStats.hasStat(Stats.Kill.DATASOURCES));
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java
index 6d98cfe0b5..f1663dffda 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java
@@ -19,16 +19,15 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
 import org.apache.druid.metadata.MetadataRuleManager;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
 import org.joda.time.Duration;
+import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
@@ -44,18 +43,14 @@ public class KillRulesTest
   @Mock
   private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
 
-  @Mock
-  private ServiceEmitter mockServiceEmitter;
-
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
   private KillRules killRules;
+  private CoordinatorRunStats runStats;
 
   @Before
   public void setup()
   {
-    
Mockito.when(mockDruidCoordinatorRuntimeParams.getDatabaseRuleManager()).thenReturn(mockRuleManager);
+    runStats = new CoordinatorRunStats();
+    
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats);
   }
 
   @Test
@@ -68,7 +63,7 @@ public class KillRulesTest
         .withCoordinatorKillMaxSegments(10)
         .withCoordinatorKillIgnoreDurationToRetain(false)
         .build();
-    killRules = new KillRules(druidCoordinatorConfig);
+    killRules = new KillRules(druidCoordinatorConfig, mockRuleManager);
     killRules.run(mockDruidCoordinatorRuntimeParams);
     Mockito.verifyNoInteractions(mockRuleManager);
   }
@@ -76,7 +71,6 @@ public class KillRulesTest
   @Test
   public void testRunNotSkipIfLastRunMoreThanPeriod()
   {
-    
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
     TestDruidCoordinatorConfig druidCoordinatorConfig = new 
TestDruidCoordinatorConfig.Builder()
         .withMetadataStoreManagementPeriod(new Duration("PT5S"))
         .withCoordinatorRuleKillPeriod(new Duration("PT6S"))
@@ -84,10 +78,10 @@ public class KillRulesTest
         .withCoordinatorKillMaxSegments(10)
         .withCoordinatorKillIgnoreDurationToRetain(false)
         .build();
-    killRules = new KillRules(druidCoordinatorConfig);
+    killRules = new KillRules(druidCoordinatorConfig, mockRuleManager);
     killRules.run(mockDruidCoordinatorRuntimeParams);
     
Mockito.verify(mockRuleManager).removeRulesForEmptyDatasourcesOlderThan(ArgumentMatchers.anyLong());
-    
Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+    Assert.assertTrue(runStats.hasStat(Stats.Kill.RULES));
   }
 
   @Test
@@ -100,9 +94,15 @@ public class KillRulesTest
         .withCoordinatorKillMaxSegments(10)
         .withCoordinatorKillIgnoreDurationToRetain(false)
         .build();
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("coordinator rule kill period must be >= 
druid.coordinator.period.metadataStoreManagementPeriod");
-    killRules = new KillRules(druidCoordinatorConfig);
+    final IllegalArgumentException exception = Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> killRules = new KillRules(druidCoordinatorConfig, 
mockRuleManager)
+    );
+    Assert.assertEquals(
+        "[druid.coordinator.kill.rule.period] must be greater than"
+        + " [druid.coordinator.period.metadataStoreManagementPeriod]",
+        exception.getMessage()
+    );
   }
 
   @Test
@@ -115,8 +115,13 @@ public class KillRulesTest
         .withCoordinatorKillMaxSegments(10)
         .withCoordinatorKillIgnoreDurationToRetain(false)
         .build();
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("coordinator rule kill retainDuration must be >= 
0");
-    killRules = new KillRules(druidCoordinatorConfig);
+    final IllegalArgumentException exception = Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> killRules = new KillRules(druidCoordinatorConfig, 
mockRuleManager)
+    );
+    Assert.assertEquals(
+        "[druid.coordinator.kill.rule.durationToRetain] must be 0 milliseconds 
or higher",
+        exception.getMessage()
+    );
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDutyTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDutyTest.java
index a07c4f676a..e93e04f1b3 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDutyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDutyTest.java
@@ -19,15 +19,15 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
 import org.apache.druid.metadata.MetadataSupervisorManager;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
 import org.joda.time.Duration;
 import org.junit.Assert;
-import org.junit.Rule;
+import org.junit.Before;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
@@ -44,43 +44,70 @@ public class KillSupervisorsCustomDutyTest
   private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
 
   @Mock
-  private ServiceEmitter mockServiceEmitter;
-
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
+  private DruidCoordinatorConfig coordinatorConfig;
 
   private KillSupervisorsCustomDuty killSupervisors;
 
+  @Before
+  public void setup()
+  {
+    
Mockito.when(coordinatorConfig.getCoordinatorMetadataStoreManagementPeriod())
+           .thenReturn(new Duration(3600 * 1000));
+  }
+
   @Test
   public void testConstructorFailIfRetainDurationNull()
   {
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("(Custom Duty) Coordinator supervisor kill 
retainDuration must be >= 0");
-    killSupervisors = new KillSupervisorsCustomDuty(null, 
mockMetadataSupervisorManager);
+    final IllegalArgumentException exception = Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> killSupervisors = new KillSupervisorsCustomDuty(null, 
mockMetadataSupervisorManager, coordinatorConfig)
+    );
+    Assert.assertEquals(
+        "[KillSupervisorsCustomDuty.durationToRetain] must be 0 milliseconds 
or higher",
+        exception.getMessage()
+    );
   }
 
   @Test
   public void testConstructorFailIfRetainDurationInvalid()
   {
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("(Custom Duty) Coordinator supervisor kill 
retainDuration must be >= 0");
-    killSupervisors = new KillSupervisorsCustomDuty(new Duration("PT-1s"), 
mockMetadataSupervisorManager);
+    final IllegalArgumentException exception = Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> killSupervisors = new KillSupervisorsCustomDuty(
+            new Duration("PT-1S"),
+            mockMetadataSupervisorManager,
+            coordinatorConfig
+        )
+    );
+    Assert.assertEquals(
+        "[KillSupervisorsCustomDuty.durationToRetain] must be 0 milliseconds 
or higher",
+        exception.getMessage()
+    );
   }
 
   @Test
   public void testConstructorSuccess()
   {
-    killSupervisors = new KillSupervisorsCustomDuty(new Duration("PT1S"), 
mockMetadataSupervisorManager);
+    killSupervisors = new KillSupervisorsCustomDuty(
+        new Duration("PT1S"),
+        mockMetadataSupervisorManager,
+        coordinatorConfig
+    );
     Assert.assertNotNull(killSupervisors);
   }
 
   @Test
   public void testRun()
   {
-    
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
-    killSupervisors = new KillSupervisorsCustomDuty(new Duration("PT1S"), 
mockMetadataSupervisorManager);
+    final CoordinatorRunStats runStats = new CoordinatorRunStats();
+    
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats);
+    killSupervisors = new KillSupervisorsCustomDuty(
+        new Duration("PT1S"),
+        mockMetadataSupervisorManager,
+        coordinatorConfig
+    );
     killSupervisors.run(mockDruidCoordinatorRuntimeParams);
     
Mockito.verify(mockMetadataSupervisorManager).removeTerminatedSupervisorsOlderThan(ArgumentMatchers.anyLong());
-    
Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+    Assert.assertTrue(runStats.hasStat(Stats.Kill.SUPERVISOR_SPECS));
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java
index 0f7c2fff7e..bbe92a2cd1 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java
@@ -19,15 +19,15 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
 import org.apache.druid.metadata.MetadataSupervisorManager;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
 import org.joda.time.Duration;
-import org.junit.Rule;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
@@ -43,13 +43,15 @@ public class KillSupervisorsTest
   @Mock
   private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
 
-  @Mock
-  private ServiceEmitter mockServiceEmitter;
-
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
   private KillSupervisors killSupervisors;
+  private CoordinatorRunStats runStats;
+
+  @Before
+  public void setup()
+  {
+    runStats = new CoordinatorRunStats();
+    
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats);
+  }
 
   @Test
   public void testRunSkipIfLastRunLessThanPeriod()
@@ -69,7 +71,6 @@ public class KillSupervisorsTest
   @Test
   public void testRunNotSkipIfLastRunMoreThanPeriod()
   {
-    
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
     TestDruidCoordinatorConfig druidCoordinatorConfig = new 
TestDruidCoordinatorConfig.Builder()
         .withMetadataStoreManagementPeriod(new Duration("PT5S"))
         .withCoordinatorSupervisorKillPeriod(new Duration("PT6S"))
@@ -80,7 +81,7 @@ public class KillSupervisorsTest
     killSupervisors = new KillSupervisors(druidCoordinatorConfig, 
mockMetadataSupervisorManager);
     killSupervisors.run(mockDruidCoordinatorRuntimeParams);
     
Mockito.verify(mockMetadataSupervisorManager).removeTerminatedSupervisorsOlderThan(ArgumentMatchers.anyLong());
-    
Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+    Assert.assertTrue(runStats.hasStat(Stats.Kill.SUPERVISOR_SPECS));
   }
 
   @Test
@@ -93,9 +94,15 @@ public class KillSupervisorsTest
         .withCoordinatorKillMaxSegments(10)
         .withCoordinatorKillIgnoreDurationToRetain(false)
         .build();
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("Coordinator supervisor kill period must be >= 
druid.coordinator.period.metadataStoreManagementPeriod");
-    killSupervisors = new KillSupervisors(druidCoordinatorConfig, 
mockMetadataSupervisorManager);
+    final IllegalArgumentException exception = Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> killSupervisors = new KillSupervisors(druidCoordinatorConfig, 
mockMetadataSupervisorManager)
+    );
+    Assert.assertEquals(
+        "[druid.coordinator.kill.supervisor.period] must be greater than"
+        + " [druid.coordinator.period.metadataStoreManagementPeriod]",
+        exception.getMessage()
+    );
   }
 
   @Test
@@ -108,8 +115,14 @@ public class KillSupervisorsTest
         .withCoordinatorKillMaxSegments(10)
         .withCoordinatorKillIgnoreDurationToRetain(false)
         .build();
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("Coordinator supervisor kill retainDuration must 
be >= 0");
-    killSupervisors = new KillSupervisors(druidCoordinatorConfig, 
mockMetadataSupervisorManager);
+
+    final IllegalArgumentException exception = Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> killSupervisors = new KillSupervisors(druidCoordinatorConfig, 
mockMetadataSupervisorManager)
+    );
+    Assert.assertEquals(
+        "[druid.coordinator.kill.supervisor.durationToRetain] must be 0 
milliseconds or higher",
+        exception.getMessage()
+    );
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to