This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2d8e0f28f3 Refactor: Cleanup coordinator duties for metadata cleanup
(#14631)
2d8e0f28f3 is described below
commit 2d8e0f28f36f83e091911cc7b36a57b10fd065cc
Author: Kashif Faraz <[email protected]>
AuthorDate: Sat Aug 5 13:08:23 2023 +0530
Refactor: Cleanup coordinator duties for metadata cleanup (#14631)
Changes
- Add abstract class `MetadataCleanupDuty`
- Make `KillAuditLogs`, `KillCompactionConfig`, etc extend
`MetadataCleanupDuty`
- Improve log and error messages
- Cleanup tests
- No functional change
---
docs/development/modules.md | 4 +-
.../test-groups/custom-coordinator-duties | 4 +-
.../druid/server/coordinator/DruidCoordinator.java | 1 -
.../coordinator/DruidCoordinatorRuntimeParams.java | 20 ---
.../server/coordinator/duty/KillAuditLog.java | 57 ++------
.../coordinator/duty/KillCompactionConfig.java | 160 +++++++++------------
.../coordinator/duty/KillDatasourceMetadata.java | 92 ++++--------
.../druid/server/coordinator/duty/KillRules.java | 62 +++-----
.../server/coordinator/duty/KillSupervisors.java | 61 ++------
.../duty/KillSupervisorsCustomDuty.java | 72 +++++-----
.../coordinator/duty/MetadataCleanupDuty.java | 135 +++++++++++++++++
.../druid/server/coordinator/stats/Stats.java | 14 ++
.../coordinator/BalanceSegmentsProfiler.java | 1 -
.../server/coordinator/DruidCoordinatorTest.java | 6 +-
.../server/coordinator/duty/KillAuditLogTest.java | 49 ++++---
.../coordinator/duty/KillCompactionConfigTest.java | 88 +++++-------
.../duty/KillDatasourceMetadataTest.java | 72 ++++++----
.../server/coordinator/duty/KillRulesTest.java | 47 +++---
.../duty/KillSupervisorsCustomDutyTest.java | 63 +++++---
.../coordinator/duty/KillSupervisorsTest.java | 49 ++++---
20 files changed, 547 insertions(+), 510 deletions(-)
diff --git a/docs/development/modules.md b/docs/development/modules.md
index a0d2335194..75f4bbbe54 100644
--- a/docs/development/modules.md
+++ b/docs/development/modules.md
@@ -351,12 +351,12 @@ This config file adds the configs below to enable a
custom coordinator duty.
```
druid.coordinator.dutyGroups=["cleanupMetadata"]
druid.coordinator.cleanupMetadata.duties=["killSupervisors"]
-druid.coordinator.cleanupMetadata.duty.killSupervisors.retainDuration=PT0M
+druid.coordinator.cleanupMetadata.duty.killSupervisors.durationToRetain=PT0M
druid.coordinator.cleanupMetadata.period=PT10S
```
These configurations create a custom coordinator duty group called
`cleanupMetadata` which runs a custom coordinator duty called `killSupervisors`
every 10 seconds.
-The custom coordinator duty `killSupervisors` also has a config called
`retainDuration` which is set to 0 minute.
+The custom coordinator duty `killSupervisors` also has a config called
`durationToRetain` which is set to 0 minute.
### Routing data through a HTTP proxy for your extension
diff --git
a/integration-tests/docker/environment-configs/test-groups/custom-coordinator-duties
b/integration-tests/docker/environment-configs/test-groups/custom-coordinator-duties
index 5c75c4197d..cea6370c29 100644
---
a/integration-tests/docker/environment-configs/test-groups/custom-coordinator-duties
+++
b/integration-tests/docker/environment-configs/test-groups/custom-coordinator-duties
@@ -20,12 +20,12 @@
# If you are making a change in load list below, make the necessary changes in
github actions too
druid_extensions_loadList=["druid-kafka-indexing-service","mysql-metadata-storage","druid-s3-extensions","druid-basic-security","simple-client-sslcontext","druid-testing-tools","druid-lookups-cached-global","druid-histogram","druid-datasketches"]
-druid_coordinator_period_metadataStoreManagementPeriod=PT1H
+druid_coordinator_period_metadataStoreManagementPeriod=PT5S
druid_sql_planner_authorizeSystemTablesDirectly=false
#Testing kill supervisor custom coordinator duty
druid_coordinator_kill_supervisor_on=false
druid_coordinator_dutyGroups=["cleanupMetadata"]
druid_coordinator_cleanupMetadata_duties=["killSupervisors"]
-druid_coordinator_cleanupMetadata_duty_killSupervisors_retainDuration=PT0M
+druid_coordinator_cleanupMetadata_duty_killSupervisors_durationToRetain=PT0M
druid_coordinator_cleanupMetadata_period=PT10S
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index f8ea8ef24f..272de19010 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -699,7 +699,6 @@ public class DruidCoordinator
.withSnapshotOfDataSourcesWithAllUsedSegments(dataSourcesSnapshot)
.withDynamicConfigs(getDynamicConfigs())
.withCompactionConfig(getCompactionConfig())
- .withEmitter(emitter)
.build();
log.info(
"Initialized run params for group [%s] with [%,d] used segments in
[%d] datasources.",
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
index ed1c57dbb1..79173683fa 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
@@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DataSourcesSnapshot;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
@@ -68,7 +67,6 @@ public class DruidCoordinatorRuntimeParams
private final StrategicSegmentAssigner segmentAssigner;
private final @Nullable TreeSet<DataSegment> usedSegments;
private final @Nullable DataSourcesSnapshot dataSourcesSnapshot;
- private final ServiceEmitter emitter;
private final CoordinatorDynamicConfig coordinatorDynamicConfig;
private final CoordinatorCompactionConfig coordinatorCompactionConfig;
private final SegmentLoadingConfig segmentLoadingConfig;
@@ -83,7 +81,6 @@ public class DruidCoordinatorRuntimeParams
StrategicSegmentAssigner segmentAssigner,
@Nullable TreeSet<DataSegment> usedSegments,
@Nullable DataSourcesSnapshot dataSourcesSnapshot,
- ServiceEmitter emitter,
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorCompactionConfig coordinatorCompactionConfig,
SegmentLoadingConfig segmentLoadingConfig,
@@ -98,7 +95,6 @@ public class DruidCoordinatorRuntimeParams
this.segmentAssigner = segmentAssigner;
this.usedSegments = usedSegments;
this.dataSourcesSnapshot = dataSourcesSnapshot;
- this.emitter = emitter;
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.coordinatorCompactionConfig = coordinatorCompactionConfig;
this.segmentLoadingConfig = segmentLoadingConfig;
@@ -145,11 +141,6 @@ public class DruidCoordinatorRuntimeParams
return usedSegments;
}
- public ServiceEmitter getEmitter()
- {
- return emitter;
- }
-
public CoordinatorDynamicConfig getCoordinatorDynamicConfig()
{
return coordinatorDynamicConfig;
@@ -200,7 +191,6 @@ public class DruidCoordinatorRuntimeParams
segmentAssigner,
usedSegments,
dataSourcesSnapshot,
- emitter,
coordinatorDynamicConfig,
coordinatorCompactionConfig,
segmentLoadingConfig,
@@ -219,7 +209,6 @@ public class DruidCoordinatorRuntimeParams
private StrategicSegmentAssigner segmentAssigner;
private @Nullable TreeSet<DataSegment> usedSegments;
private @Nullable DataSourcesSnapshot dataSourcesSnapshot;
- private ServiceEmitter emitter;
private CoordinatorDynamicConfig coordinatorDynamicConfig;
private CoordinatorCompactionConfig coordinatorCompactionConfig;
private SegmentLoadingConfig segmentLoadingConfig;
@@ -242,7 +231,6 @@ public class DruidCoordinatorRuntimeParams
StrategicSegmentAssigner segmentAssigner,
@Nullable TreeSet<DataSegment> usedSegments,
@Nullable DataSourcesSnapshot dataSourcesSnapshot,
- ServiceEmitter emitter,
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorCompactionConfig coordinatorCompactionConfig,
SegmentLoadingConfig segmentLoadingConfig,
@@ -257,7 +245,6 @@ public class DruidCoordinatorRuntimeParams
this.segmentAssigner = segmentAssigner;
this.usedSegments = usedSegments;
this.dataSourcesSnapshot = dataSourcesSnapshot;
- this.emitter = emitter;
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.coordinatorCompactionConfig = coordinatorCompactionConfig;
this.segmentLoadingConfig = segmentLoadingConfig;
@@ -279,7 +266,6 @@ public class DruidCoordinatorRuntimeParams
segmentAssigner,
usedSegments,
dataSourcesSnapshot,
- emitter,
coordinatorDynamicConfig,
coordinatorCompactionConfig,
segmentLoadingConfig,
@@ -369,12 +355,6 @@ public class DruidCoordinatorRuntimeParams
return this;
}
- public Builder withEmitter(ServiceEmitter emitter)
- {
- this.emitter = emitter;
- return this;
- }
-
public Builder withDynamicConfigs(CoordinatorDynamicConfig configs)
{
this.coordinatorDynamicConfig = configs;
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
index ac735c6594..754bec1d4f 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
@@ -19,23 +19,14 @@
package org.apache.druid.server.coordinator.duty;
-import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.druid.audit.AuditManager;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
-import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.joda.time.DateTime;
-public class KillAuditLog implements CoordinatorDuty
+public class KillAuditLog extends MetadataCleanupDuty
{
- private static final Logger log = new Logger(KillAuditLog.class);
-
- private final long period;
- private final long retainDuration;
- private long lastKillTime = 0;
-
private final AuditManager auditManager;
@Inject
@@ -44,44 +35,20 @@ public class KillAuditLog implements CoordinatorDuty
DruidCoordinatorConfig config
)
{
- this.period = config.getCoordinatorAuditKillPeriod().getMillis();
- Preconditions.checkArgument(
- this.period >=
config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
- "coordinator audit kill period must be >=
druid.coordinator.period.metadataStoreManagementPeriod"
- );
- this.retainDuration =
config.getCoordinatorAuditKillDurationToRetain().getMillis();
- Preconditions.checkArgument(this.retainDuration >= 0, "coordinator audit
kill retainDuration must be >= 0");
- Preconditions.checkArgument(this.retainDuration <
System.currentTimeMillis(), "Coordinator audit kill retainDuration cannot be
greater than current time in ms");
- log.debug(
- "Audit Kill Task scheduling enabled with period [%s], retainDuration
[%s]",
- this.period,
- this.retainDuration
+ super(
+ "audit logs",
+ "druid.coordinator.kill.audit",
+ config.getCoordinatorAuditKillPeriod(),
+ config.getCoordinatorAuditKillDurationToRetain(),
+ Stats.Kill.AUDIT_LOGS,
+ config
);
this.auditManager = auditManager;
}
@Override
- public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
+ protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime)
{
- long currentTimeMillis = System.currentTimeMillis();
- if ((lastKillTime + period) < currentTimeMillis) {
- lastKillTime = currentTimeMillis;
- long timestamp = currentTimeMillis - retainDuration;
- try {
- int auditRemoved = auditManager.removeAuditLogsOlderThan(timestamp);
- ServiceEmitter emitter = params.getEmitter();
- emitter.emit(
- new ServiceMetricEvent.Builder().build(
- "metadata/kill/audit/count",
- auditRemoved
- )
- );
- log.info("Finished running KillAuditLog duty. Removed %,d audit logs",
auditRemoved);
- }
- catch (Exception e) {
- log.error(e, "Failed to kill audit log");
- }
- }
- return params;
+ return auditManager.removeAuditLogsOlderThan(minCreatedTime.getMillis());
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java
index 74645b51b4..4b92cfa9b9 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java
@@ -19,7 +19,6 @@
package org.apache.druid.server.coordinator.duty;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.druid.audit.AuditInfo;
@@ -28,15 +27,15 @@ import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.java.util.RetryableException;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
-import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
import java.util.Map;
import java.util.Set;
@@ -48,16 +47,11 @@ import java.util.stream.Collectors;
* Note that this will delete compaction configuration for inactive datasources
* (datasource with no used and unused segments) immediately.
*/
-public class KillCompactionConfig implements CoordinatorDuty
+public class KillCompactionConfig extends MetadataCleanupDuty
{
private static final Logger log = new Logger(KillCompactionConfig.class);
private static final int UPDATE_NUM_RETRY = 5;
- static final String COUNT_METRIC = "metadata/kill/compaction/count";
-
- private final long period;
- private long lastKillTime = 0;
-
private final JacksonConfigManager jacksonConfigManager;
private final SqlSegmentsMetadataManager sqlSegmentsMetadataManager;
private final MetadataStorageConnector connector;
@@ -72,99 +66,85 @@ public class KillCompactionConfig implements CoordinatorDuty
MetadataStorageTablesConfig connectorConfig
)
{
+ super(
+ "compaction configs",
+ "druid.coordinator.kill.compaction",
+ config.getCoordinatorCompactionKillPeriod(),
+ Duration.millis(1), // Retain duration is ignored
+ Stats.Kill.COMPACTION_CONFIGS,
+ config
+ );
this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager;
this.jacksonConfigManager = jacksonConfigManager;
- this.period = config.getCoordinatorCompactionKillPeriod().getMillis();
this.connector = connector;
this.connectorConfig = connectorConfig;
- Preconditions.checkArgument(
- this.period >=
config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
- "Coordinator compaction configuration kill period must be >=
druid.coordinator.period.metadataStoreManagementPeriod"
- );
- log.debug(
- "Compaction Configuration Kill Task scheduling enabled with period
[%s]",
- this.period
- );
}
@Override
- public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
+ protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime)
{
- long currentTimeMillis = System.currentTimeMillis();
- if ((lastKillTime + period) < currentTimeMillis) {
- lastKillTime = currentTimeMillis;
- try {
- RetryUtils.retry(
- () -> {
- final byte[] currentBytes =
CoordinatorCompactionConfig.getConfigInByteFromDb(connector, connectorConfig);
- final CoordinatorCompactionConfig current =
CoordinatorCompactionConfig.convertByteToConfig(jacksonConfigManager,
currentBytes);
- // If current compaction config is empty then there is nothing
to do
- if (CoordinatorCompactionConfig.empty().equals(current)) {
- log.info(
- "Finished running KillCompactionConfig duty. Nothing to do
as compaction config is already empty.");
- emitMetric(params.getEmitter(), 0);
- return ConfigManager.SetResult.ok();
- }
-
- // Get all active datasources
- // Note that we get all active datasources after getting
compaction config to prevent race condition if new
- // datasource and config are added.
- Set<String> activeDatasources =
sqlSegmentsMetadataManager.retrieveAllDataSourceNames();
- final Map<String, DataSourceCompactionConfig> updated = current
- .getCompactionConfigs()
- .stream()
- .filter(dataSourceCompactionConfig ->
activeDatasources.contains(dataSourceCompactionConfig.getDataSource()))
-
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource,
Function.identity()));
-
- // Calculate number of compaction configs to remove for logging
- int compactionConfigRemoved =
current.getCompactionConfigs().size() - updated.size();
-
- ConfigManager.SetResult result = jacksonConfigManager.set(
- CoordinatorCompactionConfig.CONFIG_KEY,
- currentBytes,
- CoordinatorCompactionConfig.from(current,
ImmutableList.copyOf(updated.values())),
- new AuditInfo(
- "KillCompactionConfig",
- "CoordinatorDuty for automatic deletion of compaction
config",
- ""
- )
- );
- if (result.isOk()) {
- log.info(
- "Finished running KillCompactionConfig duty. Removed %,d
compaction configs",
- compactionConfigRemoved
- );
- emitMetric(params.getEmitter(), compactionConfigRemoved);
- } else if (result.isRetryable()) {
- // Failed but is retryable
- log.debug("Retrying KillCompactionConfig duty");
- throw new RetryableException(result.getException());
- } else {
- // Failed and not retryable
- log.error(result.getException(), "Failed to kill compaction
configurations");
- emitMetric(params.getEmitter(), 0);
- }
- return result;
- },
- e -> e instanceof RetryableException,
- UPDATE_NUM_RETRY
- );
- }
- catch (Exception e) {
- log.error(e, "Failed to kill compaction configurations");
- emitMetric(params.getEmitter(), 0);
- }
+ try {
+ return RetryUtils.retry(
+ this::tryDeleteCompactionConfigs,
+ e -> e instanceof RetryableException,
+ UPDATE_NUM_RETRY
+ );
+ }
+ catch (Exception e) {
+ log.error(e, "Failed to kill compaction configurations");
+ return 0;
}
- return params;
}
- private void emitMetric(ServiceEmitter emitter, int compactionConfigRemoved)
+ /**
+ * Tries to delete compaction configs for inactive datasources and returns
+ * the number of compaction configs successfully removed.
+ */
+ private int tryDeleteCompactionConfigs() throws RetryableException
{
- emitter.emit(
- new ServiceMetricEvent.Builder().build(
- COUNT_METRIC,
- compactionConfigRemoved
+ final byte[] currentBytes =
CoordinatorCompactionConfig.getConfigInByteFromDb(connector, connectorConfig);
+ final CoordinatorCompactionConfig current =
CoordinatorCompactionConfig.convertByteToConfig(
+ jacksonConfigManager,
+ currentBytes
+ );
+ // If current compaction config is empty then there is nothing to do
+ if (CoordinatorCompactionConfig.empty().equals(current)) {
+ log.info("Nothing to do as compaction config is already empty.");
+ return 0;
+ }
+
+ // Get all active datasources
+ // Note that we get all active datasources after getting compaction config
to prevent race condition if new
+ // datasource and config are added.
+ Set<String> activeDatasources =
sqlSegmentsMetadataManager.retrieveAllDataSourceNames();
+ final Map<String, DataSourceCompactionConfig> updated = current
+ .getCompactionConfigs()
+ .stream()
+ .filter(dataSourceCompactionConfig ->
activeDatasources.contains(dataSourceCompactionConfig.getDataSource()))
+ .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource,
Function.identity()));
+
+ // Calculate number of compaction configs removed
+ int compactionConfigRemoved = current.getCompactionConfigs().size() -
updated.size();
+
+ ConfigManager.SetResult result = jacksonConfigManager.set(
+ CoordinatorCompactionConfig.CONFIG_KEY,
+ currentBytes,
+ CoordinatorCompactionConfig.from(current,
ImmutableList.copyOf(updated.values())),
+ new AuditInfo(
+ "KillCompactionConfig",
+ "CoordinatorDuty for automatic deletion of compaction config",
+ ""
)
);
+
+ if (result.isOk()) {
+ return compactionConfigRemoved;
+ } else if (result.isRetryable()) {
+ log.debug("Retrying KillCompactionConfig duty");
+ throw new RetryableException(result.getException());
+ } else {
+ log.error(result.getException(), "Failed to kill compaction
configurations");
+ return 0;
+ }
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java
index 9c9535b250..7fbf1b1deb 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java
@@ -19,17 +19,14 @@
package org.apache.druid.server.coordinator.duty;
-import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
-import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.joda.time.DateTime;
import java.util.Collection;
import java.util.Map;
@@ -42,14 +39,8 @@ import java.util.stream.Collectors;
* Note that this class relies on the supervisorSpec.getDataSources names to
match with the
* 'datasource' column of the datasource metadata table.
*/
-public class KillDatasourceMetadata implements CoordinatorDuty
+public class KillDatasourceMetadata extends MetadataCleanupDuty
{
- private static final Logger log = new Logger(KillDatasourceMetadata.class);
-
- private final long period;
- private final long retainDuration;
- private long lastKillTime = 0;
-
private final IndexerMetadataStorageCoordinator
indexerMetadataStorageCoordinator;
private final MetadataSupervisorManager metadataSupervisorManager;
@@ -60,62 +51,37 @@ public class KillDatasourceMetadata implements
CoordinatorDuty
MetadataSupervisorManager metadataSupervisorManager
)
{
+ super(
+ "datasources",
+ "druid.coordinator.kill.datasource",
+ config.getCoordinatorDatasourceKillPeriod(),
+ config.getCoordinatorDatasourceKillDurationToRetain(),
+ Stats.Kill.DATASOURCES,
+ config
+ );
this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
this.metadataSupervisorManager = metadataSupervisorManager;
- this.period = config.getCoordinatorDatasourceKillPeriod().getMillis();
- Preconditions.checkArgument(
- this.period >=
config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
- "Coordinator datasource metadata kill period must be >=
druid.coordinator.period.metadataStoreManagementPeriod"
- );
- this.retainDuration =
config.getCoordinatorDatasourceKillDurationToRetain().getMillis();
- Preconditions.checkArgument(this.retainDuration >= 0, "Coordinator
datasource metadata kill retainDuration must be >= 0");
- Preconditions.checkArgument(this.retainDuration <
System.currentTimeMillis(), "Coordinator datasource metadata kill
retainDuration cannot be greater than current time in ms");
- log.debug(
- "Datasource Metadata Kill Task scheduling enabled with period [%s],
retainDuration [%s]",
- this.period,
- this.retainDuration
- );
}
@Override
- public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
+ protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime)
{
- long currentTimeMillis = System.currentTimeMillis();
- if ((lastKillTime + period) < currentTimeMillis) {
- lastKillTime = currentTimeMillis;
- long timestamp = currentTimeMillis - retainDuration;
- try {
- // Datasource metadata only exists for datasource with supervisor
- // To determine if datasource metadata is still active, we check if
the supervisor for that particular datasource
- // is still active or not
- Map<String, SupervisorSpec> allActiveSupervisor =
metadataSupervisorManager.getLatestActiveOnly();
- Set<String> allDatasourceWithActiveSupervisor =
allActiveSupervisor.values()
-
.stream()
-
.map(supervisorSpec -> supervisorSpec.getDataSources())
-
.flatMap(Collection::stream)
-
.filter(datasource -> !Strings.isNullOrEmpty(datasource))
-
.collect(Collectors.toSet());
- // We exclude removing datasource metadata with active supervisor
- int datasourceMetadataRemovedCount =
indexerMetadataStorageCoordinator.removeDataSourceMetadataOlderThan(
- timestamp,
- allDatasourceWithActiveSupervisor
- );
- ServiceEmitter emitter = params.getEmitter();
- emitter.emit(
- new ServiceMetricEvent.Builder().build(
- "metadata/kill/datasource/count",
- datasourceMetadataRemovedCount
- )
- );
- log.info(
- "Finished running KillDatasourceMetadata duty. Removed %,d
datasource metadata",
- datasourceMetadataRemovedCount
- );
- }
- catch (Exception e) {
- log.error(e, "Failed to kill datasource metadata");
- }
- }
- return params;
+ // Datasource metadata only exists for datasource with supervisor
+ // To determine if datasource metadata is still active, we check if the
supervisor for that particular datasource
+ // is still active or not
+ Map<String, SupervisorSpec> allActiveSupervisor =
metadataSupervisorManager.getLatestActiveOnly();
+ Set<String> allDatasourceWithActiveSupervisor
+ = allActiveSupervisor.values()
+ .stream()
+ .map(SupervisorSpec::getDataSources)
+ .flatMap(Collection::stream)
+ .filter(datasource ->
!Strings.isNullOrEmpty(datasource))
+ .collect(Collectors.toSet());
+
+ // We exclude removing datasource metadata with active supervisor
+ return indexerMetadataStorageCoordinator.removeDataSourceMetadataOlderThan(
+ minCreatedTime.getMillis(),
+ allDatasourceWithActiveSupervisor
+ );
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java
index 50b1740e86..40964959b3 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java
@@ -19,64 +19,36 @@
package org.apache.druid.server.coordinator.duty;
-import com.google.common.base.Preconditions;
import com.google.inject.Inject;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
-import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.joda.time.DateTime;
-public class KillRules implements CoordinatorDuty
+public class KillRules extends MetadataCleanupDuty
{
- private static final Logger log = new Logger(KillRules.class);
-
- private final long period;
- private final long retainDuration;
- private long lastKillTime = 0;
+ private final MetadataRuleManager metadataRuleManager;
@Inject
public KillRules(
- DruidCoordinatorConfig config
+ DruidCoordinatorConfig config,
+ MetadataRuleManager metadataRuleManager
)
{
- this.period = config.getCoordinatorRuleKillPeriod().getMillis();
- Preconditions.checkArgument(
- this.period >=
config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
- "coordinator rule kill period must be >=
druid.coordinator.period.metadataStoreManagementPeriod"
- );
- this.retainDuration =
config.getCoordinatorRuleKillDurationToRetain().getMillis();
- Preconditions.checkArgument(this.retainDuration >= 0, "coordinator rule
kill retainDuration must be >= 0");
- Preconditions.checkArgument(this.retainDuration <
System.currentTimeMillis(), "Coordinator rule kill retainDuration cannot be
greater than current time in ms");
- log.debug(
- "Rule Kill Task scheduling enabled with period [%s], retainDuration
[%s]",
- this.period,
- this.retainDuration
+ super(
+ "rules",
+ "druid.coordinator.kill.rule",
+ config.getCoordinatorRuleKillPeriod(),
+ config.getCoordinatorRuleKillDurationToRetain(),
+ Stats.Kill.RULES,
+ config
);
+ this.metadataRuleManager = metadataRuleManager;
}
@Override
- public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
+ protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime)
{
- long currentTimeMillis = System.currentTimeMillis();
- if ((lastKillTime + period) < currentTimeMillis) {
- lastKillTime = currentTimeMillis;
- long timestamp = currentTimeMillis - retainDuration;
- try {
- int ruleRemoved =
params.getDatabaseRuleManager().removeRulesForEmptyDatasourcesOlderThan(timestamp);
- ServiceEmitter emitter = params.getEmitter();
- emitter.emit(
- new ServiceMetricEvent.Builder().build(
- "metadata/kill/rule/count",
- ruleRemoved
- )
- );
- log.info("Finished running KillRules duty. Removed %,d rule",
ruleRemoved);
- }
- catch (Exception e) {
- log.error(e, "Failed to kill rules metadata");
- }
- }
- return params;
+ return
metadataRuleManager.removeRulesForEmptyDatasourcesOlderThan(minCreatedTime.getMillis());
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java
index 3f87f5dee8..7f6c43d5ee 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java
@@ -19,26 +19,17 @@
package org.apache.druid.server.coordinator.duty;
-import com.google.common.base.Preconditions;
import com.google.inject.Inject;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
-import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.joda.time.DateTime;
/**
- * CoordinatorDuty for automatic deletion of terminated supervisors from the
supervisor table in metadata storage.
+ * Cleans up terminated supervisors from the supervisors table in metadata
storage.
*/
-public class KillSupervisors implements CoordinatorDuty
+public class KillSupervisors extends MetadataCleanupDuty
{
- private static final Logger log = new Logger(KillSupervisors.class);
-
- private final long period;
- private final long retainDuration;
- private long lastKillTime = 0;
-
private final MetadataSupervisorManager metadataSupervisorManager;
@Inject
@@ -47,44 +38,20 @@ public class KillSupervisors implements CoordinatorDuty
MetadataSupervisorManager metadataSupervisorManager
)
{
- this.metadataSupervisorManager = metadataSupervisorManager;
- this.period = config.getCoordinatorSupervisorKillPeriod().getMillis();
- Preconditions.checkArgument(
- this.period >=
config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
- "Coordinator supervisor kill period must be >=
druid.coordinator.period.metadataStoreManagementPeriod"
- );
- this.retainDuration =
config.getCoordinatorSupervisorKillDurationToRetain().getMillis();
- Preconditions.checkArgument(this.retainDuration >= 0, "Coordinator
supervisor kill retainDuration must be >= 0");
- Preconditions.checkArgument(this.retainDuration <
System.currentTimeMillis(), "Coordinator supervisor kill retainDuration cannot
be greater than current time in ms");
- log.debug(
- "Supervisor Kill Task scheduling enabled with period [%s],
retainDuration [%s]",
- this.period,
- this.retainDuration
+ super(
+ "supervisors",
+ "druid.coordinator.kill.supervisor",
+ config.getCoordinatorSupervisorKillPeriod(),
+ config.getCoordinatorSupervisorKillDurationToRetain(),
+ Stats.Kill.SUPERVISOR_SPECS,
+ config
);
+ this.metadataSupervisorManager = metadataSupervisorManager;
}
@Override
- public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
+ protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime)
{
- long currentTimeMillis = System.currentTimeMillis();
- if ((lastKillTime + period) < currentTimeMillis) {
- lastKillTime = currentTimeMillis;
- long timestamp = currentTimeMillis - retainDuration;
- try {
- int supervisorRemoved =
metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(timestamp);
- ServiceEmitter emitter = params.getEmitter();
- emitter.emit(
- new ServiceMetricEvent.Builder().build(
- "metadata/kill/supervisor/count",
- supervisorRemoved
- )
- );
- log.info("Finished running KillSupervisors duty. Removed %,d
supervisor specs", supervisorRemoved);
- }
- catch (Exception e) {
- log.error(e, "Failed to kill terminated supervisor metadata");
- }
- }
- return params;
+ return
metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(minCreatedTime.getMillis());
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDuty.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDuty.java
index 7a5b1d6970..7d645967be 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDuty.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDuty.java
@@ -22,64 +22,58 @@ package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
+import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.MetadataSupervisorManager;
-import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.joda.time.DateTime;
import org.joda.time.Duration;
/**
- * CoordinatorDuty for automatic deletion of terminated supervisors from the
supervisor table in metadata storage.
- * This class has the same purpose as {@link KillSupervisors} but uses a
different configuration style as
- * detailed in {@link CoordinatorCustomDuty}. This class primary purpose is as
an example to demostrate the usuage
- * of the {@link CoordinatorCustomDuty} {@link
org.apache.druid.guice.annotations.ExtensionPoint}
- *
- * Production use case should still use {@link KillSupervisors}. In the
future, we might migrate all metadata
- * management coordinator duties to {@link CoordinatorCustomDuty} but until
then this class will remains undocumented
- * and should not be use in production.
+ * Example {@link CoordinatorCustomDuty} for automatic deletion of terminated
+ * supervisors from the metadata storage. This duty has the same implementation
+ * as {@link KillSupervisors} but uses a different configuration style as
+ * detailed in {@link CoordinatorCustomDuty}.
+ * <p>
+ * This duty is only an example to demostrate the usage of coordinator custom
+ * duties. All production clusters should continue using {@link
KillSupervisors}.
+ * <p>
+ * In the future, we might migrate all metadata management coordinator duties
to
+ * {@link CoordinatorCustomDuty} but until then this class will remain
undocumented.
*/
-public class KillSupervisorsCustomDuty implements CoordinatorCustomDuty
+@UnstableApi
+public class KillSupervisorsCustomDuty extends MetadataCleanupDuty implements
CoordinatorCustomDuty
{
private static final Logger log = new
Logger(KillSupervisorsCustomDuty.class);
- private final Duration retainDuration;
private final MetadataSupervisorManager metadataSupervisorManager;
@JsonCreator
public KillSupervisorsCustomDuty(
- @JsonProperty("retainDuration") Duration retainDuration,
- @JacksonInject MetadataSupervisorManager metadataSupervisorManager
+ @JsonProperty("durationToRetain") Duration retainDuration,
+ @JacksonInject MetadataSupervisorManager metadataSupervisorManager,
+ @JacksonInject DruidCoordinatorConfig coordinatorConfig
)
{
- this.metadataSupervisorManager = metadataSupervisorManager;
- this.retainDuration = retainDuration;
- Preconditions.checkArgument(this.retainDuration != null &&
this.retainDuration.getMillis() >= 0, "(Custom Duty) Coordinator supervisor
kill retainDuration must be >= 0");
- log.info(
- "Supervisor Kill Task scheduling enabled with retainDuration [%s]",
- this.retainDuration
+ super(
+ "supervisors",
+ "KillSupervisorsCustomDuty",
+ // Use the same period as metadata store management so that validation
passes
+ // Actual period of custom duties is configured by the user
+ coordinatorConfig.getCoordinatorMetadataStoreManagementPeriod(),
+ retainDuration,
+ Stats.Kill.SUPERVISOR_SPECS,
+ coordinatorConfig
);
+ this.metadataSupervisorManager = metadataSupervisorManager;
+ log.warn("This is only an example implementation of a custom duty and"
+ + " must not be used in production. Use KillSupervisors duty
instead.");
}
@Override
- public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
+ protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime)
{
- long timestamp = System.currentTimeMillis() - retainDuration.getMillis();
- try {
- int supervisorRemoved =
metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(timestamp);
- ServiceEmitter emitter = params.getEmitter();
- emitter.emit(
- new ServiceMetricEvent.Builder().build(
- "metadata/kill/supervisor/count",
- supervisorRemoved
- )
- );
- log.info("Finished running KillSupervisors duty. Removed %,d supervisor
specs", supervisorRemoved);
- }
- catch (Exception e) {
- log.error(e, "Failed to kill terminated supervisor metadata");
- }
- return params;
+ return
metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(minCreatedTime.getMillis());
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/MetadataCleanupDuty.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/MetadataCleanupDuty.java
new file mode 100644
index 0000000000..3206f11003
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/MetadataCleanupDuty.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.CoordinatorStat;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
+
+/**
+ * Performs cleanup of stale metadata entries created before a configured
retain duration.
+ * <p>
+ * In every invocation of {@link #run}, the duty checks if the {@link
#cleanupPeriod}
+ * has elapsed since the {@link #lastCleanupTime}. If it has, then the method
+ * {@link #cleanupEntriesCreatedBefore(DateTime)} is invoked. Otherwise, the
duty
+ * completes immediately without making any changes.
+ */
+public abstract class MetadataCleanupDuty implements CoordinatorDuty
+{
+ private static final Logger log = new Logger(MetadataCleanupDuty.class);
+
+ private final String propertyPrefix;
+ private final String entryType;
+ private final CoordinatorStat cleanupCountStat;
+
+ private final Duration cleanupPeriod;
+ private final Duration retainDuration;
+
+ private DateTime lastCleanupTime = DateTimes.utc(0);
+
+ protected MetadataCleanupDuty(
+ String entryType,
+ String propertyPrefix,
+ Duration cleanupPeriod,
+ Duration retainDuration,
+ CoordinatorStat cleanupCountStat,
+ DruidCoordinatorConfig coordinatorConfig
+ )
+ {
+ this.propertyPrefix = propertyPrefix;
+ this.entryType = entryType;
+ this.cleanupPeriod = cleanupPeriod;
+ this.retainDuration = retainDuration;
+ this.cleanupCountStat = cleanupCountStat;
+
+ validatePeriod(cleanupPeriod,
coordinatorConfig.getCoordinatorMetadataStoreManagementPeriod());
+ validateRetainDuration(retainDuration);
+
+ log.debug(
+ "Enabled cleanup of [%s] with period [%s] and durationToRetain [%s].",
+ entryType, cleanupPeriod, retainDuration
+ );
+ }
+
+ @Nullable
+ @Override
+ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
+ {
+ final DateTime now = DateTimes.nowUtc();
+
+ // Perform cleanup only if cleanup period has elapsed
+ if (lastCleanupTime.plus(cleanupPeriod).isBefore(now)) {
+ lastCleanupTime = now;
+
+ try {
+ DateTime minCreatedTime = now.minus(retainDuration);
+ int deletedEntries = cleanupEntriesCreatedBefore(minCreatedTime);
+ log.info("Removed [%,d] [%s] created before [%s].", deletedEntries,
entryType, minCreatedTime);
+
+ params.getCoordinatorStats().add(cleanupCountStat, deletedEntries);
+ }
+ catch (Exception e) {
+ log.error(e, "Failed to perform cleanup of [%s]", entryType);
+ }
+ }
+
+ return params;
+ }
+
+ /**
+ * Cleans up metadata entries created before the {@code minCreatedTime}.
+ * <p>
+ * This method is not invoked if the {@link #cleanupPeriod} has not elapsed
+ * since the {@link #lastCleanupTime}.
+ *
+ * @return Number of deleted metadata entries
+ */
+ protected abstract int cleanupEntriesCreatedBefore(DateTime minCreatedTime);
+
+ private void validatePeriod(Duration period, Duration
metadataManagementPeriod)
+ {
+ Preconditions.checkArgument(
+ period != null && period.getMillis() >=
metadataManagementPeriod.getMillis(),
+ "[%s.period] must be greater than
[druid.coordinator.period.metadataStoreManagementPeriod]",
+ propertyPrefix
+ );
+ }
+
+ private void validateRetainDuration(Duration retainDuration)
+ {
+ Preconditions.checkArgument(
+ retainDuration != null && retainDuration.getMillis() >= 0,
+ "[%s.durationToRetain] must be 0 milliseconds or higher",
+ propertyPrefix
+ );
+ Preconditions.checkArgument(
+ retainDuration.getMillis() < System.currentTimeMillis(),
+ "[%s.durationToRetain] cannot be greater than current time in
milliseconds",
+ propertyPrefix
+ );
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
index 28f5c91049..ac37767327 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
@@ -129,6 +129,20 @@ public class Stats
= CoordinatorStat.toDebugAndEmit("groupRunTime",
"coordinator/global/time");
}
+ public static class Kill
+ {
+ public static final CoordinatorStat COMPACTION_CONFIGS
+ = CoordinatorStat.toDebugAndEmit("killedCompactConfigs",
"metadata/kill/compaction/count");
+ public static final CoordinatorStat SUPERVISOR_SPECS
+ = CoordinatorStat.toDebugAndEmit("killedSupervisorSpecs",
"metadata/kill/supervisor/count");
+ public static final CoordinatorStat RULES
+ = CoordinatorStat.toDebugAndEmit("killedRules",
"metadata/kill/rule/count");
+ public static final CoordinatorStat AUDIT_LOGS
+ = CoordinatorStat.toDebugAndEmit("killedAuditLogs",
"metadata/kill/audit/count");
+ public static final CoordinatorStat DATASOURCES
+ = CoordinatorStat.toDebugAndEmit("killedDatasources",
"metadata/kill/datasource/count");
+ }
+
public static class Balancer
{
public static final CoordinatorStat COMPUTATION_ERRORS =
CoordinatorStat.toLogAndEmit(
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
index 5f1e9dd7e8..d1df912698 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
@@ -139,7 +139,6 @@ public class BalanceSegmentsProfiler
.build()
)
.withSegmentAssignerUsing(loadQueueManager)
- .withEmitter(emitter)
.withDatabaseRuleManager(manager)
.build();
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 15fc5f5ac9..979181619a 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -690,7 +690,11 @@ public class DruidCoordinatorTest extends CuratorTestBase
@Test
public void
testInitializeCompactSegmentsDutyWhenCustomDutyGroupDoesNotContainsCompactSegments()
{
- CoordinatorCustomDutyGroup group = new
CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1),
ImmutableList.of(new KillSupervisorsCustomDuty(new Duration("PT1S"), null)));
+ CoordinatorCustomDutyGroup group = new CoordinatorCustomDutyGroup(
+ "group1",
+ Duration.standardSeconds(1),
+ ImmutableList.of(new KillSupervisorsCustomDuty(new Duration("PT1S"),
null, druidCoordinatorConfig))
+ );
CoordinatorCustomDutyGroups customDutyGroups = new
CoordinatorCustomDutyGroups(ImmutableSet.of(group));
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
index 8480670160..bec2e3ed4b 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
@@ -20,14 +20,14 @@
package org.apache.druid.server.coordinator.duty;
import org.apache.druid.audit.AuditManager;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
import org.joda.time.Duration;
-import org.junit.Rule;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
@@ -43,13 +43,15 @@ public class KillAuditLogTest
@Mock
private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
- @Mock
- private ServiceEmitter mockServiceEmitter;
-
- @Rule
- public ExpectedException exception = ExpectedException.none();
-
private KillAuditLog killAuditLog;
+ private CoordinatorRunStats runStats;
+
+ @Before
+ public void setup()
+ {
+ runStats = new CoordinatorRunStats();
+
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats);
+ }
@Test
public void testRunSkipIfLastRunLessThanPeriod()
@@ -69,7 +71,6 @@ public class KillAuditLogTest
@Test
public void testRunNotSkipIfLastRunMoreThanPeriod()
{
-
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
TestDruidCoordinatorConfig druidCoordinatorConfig = new
TestDruidCoordinatorConfig.Builder()
.withMetadataStoreManagementPeriod(new Duration("PT5s"))
.withCoordianatorAuditKillPeriod(new Duration("PT6S"))
@@ -80,7 +81,7 @@ public class KillAuditLogTest
killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
killAuditLog.run(mockDruidCoordinatorRuntimeParams);
Mockito.verify(mockAuditManager).removeAuditLogsOlderThan(ArgumentMatchers.anyLong());
-
Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+ Assert.assertTrue(runStats.hasStat(Stats.Kill.AUDIT_LOGS));
}
@Test
@@ -93,9 +94,16 @@ public class KillAuditLogTest
.withCoordinatorKillMaxSegments(10)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("coordinator audit kill period must be >=
druid.coordinator.period.metadataStoreManagementPeriod");
- killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
+
+ final IllegalArgumentException exception = Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> killAuditLog = new KillAuditLog(mockAuditManager,
druidCoordinatorConfig)
+ );
+ Assert.assertEquals(
+ "[druid.coordinator.kill.audit.period] must be greater than"
+ + " [druid.coordinator.period.metadataStoreManagementPeriod]",
+ exception.getMessage()
+ );
}
@Test
@@ -108,8 +116,13 @@ public class KillAuditLogTest
.withCoordinatorKillMaxSegments(10)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("coordinator audit kill retainDuration must be >=
0");
- killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
+ final IllegalArgumentException exception = Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> killAuditLog = new KillAuditLog(mockAuditManager,
druidCoordinatorConfig)
+ );
+ Assert.assertEquals(
+ "[druid.coordinator.kill.audit.durationToRetain] must be 0
milliseconds or higher",
+ exception.getMessage()
+ );
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
index 916f3ed5e6..6d201aa9d6 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
@@ -25,8 +25,6 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
@@ -35,21 +33,19 @@ import
org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
@RunWith(MockitoJUnitRunner.class)
public class KillCompactionConfigTest
@@ -57,9 +53,6 @@ public class KillCompactionConfigTest
@Mock
private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
- @Mock
- private ServiceEmitter mockServiceEmitter;
-
@Mock
private SqlSegmentsMetadataManager mockSqlSegmentsMetadataManager;
@@ -72,15 +65,15 @@ public class KillCompactionConfigTest
@Mock
private MetadataStorageTablesConfig mockConnectorConfig;
- @Rule
- public ExpectedException exception = ExpectedException.none();
-
private KillCompactionConfig killCompactionConfig;
+ private CoordinatorRunStats runStats;
@Before
public void setup()
{
+ runStats = new CoordinatorRunStats();
Mockito.when(mockConnectorConfig.getConfigTable()).thenReturn("druid_config");
+
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats);
}
@Test
@@ -102,7 +95,7 @@ public class KillCompactionConfigTest
killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
Mockito.verifyNoInteractions(mockSqlSegmentsMetadataManager);
Mockito.verifyNoInteractions(mockJacksonConfigManager);
- Mockito.verifyNoInteractions(mockServiceEmitter);
+ Assert.assertEquals(0, runStats.rowCount());
}
@Test
@@ -114,14 +107,21 @@ public class KillCompactionConfigTest
.withCoordinatorKillMaxSegments(10)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Coordinator compaction configuration kill period
must be >= druid.coordinator.period.metadataStoreManagementPeriod");
- killCompactionConfig = new KillCompactionConfig(
- druidCoordinatorConfig,
- mockSqlSegmentsMetadataManager,
- mockJacksonConfigManager,
- mockConnector,
- mockConnectorConfig
+
+ final IllegalArgumentException exception = Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> killCompactionConfig = new KillCompactionConfig(
+ druidCoordinatorConfig,
+ mockSqlSegmentsMetadataManager,
+ mockJacksonConfigManager,
+ mockConnector,
+ mockConnectorConfig
+ )
+ );
+ Assert.assertEquals(
+ "[druid.coordinator.kill.compaction.period] must be greater than"
+ + " [druid.coordinator.period.metadataStoreManagementPeriod]",
+ exception.getMessage()
);
}
@@ -129,7 +129,6 @@ public class KillCompactionConfigTest
@Test
public void testRunDoNothingIfCurrentConfigIsEmpty()
{
-
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
// Set current compaction config to an empty compaction config
Mockito.when(mockConnector.lookup(
ArgumentMatchers.anyString(),
@@ -158,10 +157,9 @@ public class KillCompactionConfigTest
);
killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
Mockito.verifyNoInteractions(mockSqlSegmentsMetadataManager);
- final ArgumentCaptor<ServiceEventBuilder> emittedEventCaptor =
ArgumentCaptor.forClass(ServiceEventBuilder.class);
- Mockito.verify(mockServiceEmitter).emit(emittedEventCaptor.capture());
- Assert.assertEquals(KillCompactionConfig.COUNT_METRIC,
emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("metric"));
- Assert.assertEquals(0,
emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value"));
+ Assert.assertTrue(runStats.hasStat(Stats.Kill.COMPACTION_CONFIGS));
+ Assert.assertEquals(0, runStats.get(Stats.Kill.COMPACTION_CONFIGS));
+
Mockito.verify(mockJacksonConfigManager).convertByteToConfig(
ArgumentMatchers.eq(null),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
@@ -223,7 +221,6 @@ public class KillCompactionConfigTest
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
).thenReturn(originalCurrentConfig);
-
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of(activeDatasourceName));
final ArgumentCaptor<byte[]> oldConfigCaptor =
ArgumentCaptor.forClass(byte[].class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor =
ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
@@ -257,11 +254,7 @@ public class KillCompactionConfigTest
Assert.assertEquals(1,
newConfigCaptor.getValue().getCompactionConfigs().size());
Assert.assertEquals(activeDatasourceConfig,
newConfigCaptor.getValue().getCompactionConfigs().get(0));
- final ArgumentCaptor<ServiceEventBuilder> emittedEventCaptor =
ArgumentCaptor.forClass(ServiceEventBuilder.class);
- Mockito.verify(mockServiceEmitter).emit(emittedEventCaptor.capture());
- Assert.assertEquals(KillCompactionConfig.COUNT_METRIC,
emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("metric"));
- // Should delete 1 config
- Assert.assertEquals(1,
emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value"));
+ Assert.assertEquals(1, runStats.get(Stats.Kill.COMPACTION_CONFIGS));
Mockito.verify(mockJacksonConfigManager).convertByteToConfig(
ArgumentMatchers.eq(originalCurrentConfigBytes),
@@ -317,27 +310,20 @@ public class KillCompactionConfigTest
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
).thenReturn(originalCurrentConfig);
-
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
Mockito.when(mockSqlSegmentsMetadataManager.retrieveAllDataSourceNames()).thenReturn(ImmutableSet.of());
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
ArgumentMatchers.any(byte[].class),
ArgumentMatchers.any(CoordinatorCompactionConfig.class),
ArgumentMatchers.any())
- ).thenAnswer(new Answer() {
- private int count = 0;
- @Override
- public Object answer(InvocationOnMock invocation)
- {
- if (count++ < 3) {
- // Return fail result with RetryableException the first three call
to updated set
- return ConfigManager.SetResult.fail(new Exception(), true);
- } else {
- // Return success ok on the fourth call to set updated config
- return ConfigManager.SetResult.ok();
- }
- }
- });
+ ).thenReturn(
+ // Return fail result with RetryableException the first three calls to
updated set
+ ConfigManager.SetResult.fail(new Exception(), true),
+ ConfigManager.SetResult.fail(new Exception(), true),
+ ConfigManager.SetResult.fail(new Exception(), true),
+ // Return success ok on the fourth call to set updated config
+ ConfigManager.SetResult.ok()
+ );
TestDruidCoordinatorConfig druidCoordinatorConfig = new
TestDruidCoordinatorConfig.Builder()
.withMetadataStoreManagementPeriod(new Duration("PT5S"))
@@ -354,12 +340,8 @@ public class KillCompactionConfigTest
);
killCompactionConfig.run(mockDruidCoordinatorRuntimeParams);
- // Verify and Assert
- final ArgumentCaptor<ServiceEventBuilder> emittedEventCaptor =
ArgumentCaptor.forClass(ServiceEventBuilder.class);
- Mockito.verify(mockServiceEmitter).emit(emittedEventCaptor.capture());
- Assert.assertEquals(KillCompactionConfig.COUNT_METRIC,
emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("metric"));
- // Should delete 1 config
- Assert.assertEquals(1,
emittedEventCaptor.getValue().build(ImmutableMap.of()).toMap().get("value"));
+ // Verify that 1 config has been deleted
+ Assert.assertEquals(1, runStats.get(Stats.Kill.COMPACTION_CONFIGS));
// Should call convertByteToConfig and lookup (to refresh current
compaction config) four times due to RetryableException when failed
Mockito.verify(mockJacksonConfigManager,
Mockito.times(4)).convertByteToConfig(
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java
index 9e49603315..222d5a715f 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java
@@ -21,16 +21,15 @@ package org.apache.druid.server.coordinator.duty;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.metadata.MetadataSupervisorManager;
-import org.apache.druid.metadata.TestSupervisorSpec;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
import org.joda.time.Duration;
-import org.junit.Rule;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
@@ -49,16 +48,15 @@ public class KillDatasourceMetadataTest
@Mock
private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
- @Mock
- private TestSupervisorSpec mockKinesisSupervisorSpec;
-
- @Mock
- private ServiceEmitter mockServiceEmitter;
-
- @Rule
- public ExpectedException exception = ExpectedException.none();
-
private KillDatasourceMetadata killDatasourceMetadata;
+ private CoordinatorRunStats runStats;
+
+ @Before
+ public void setup()
+ {
+ runStats = new CoordinatorRunStats();
+
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats);
+ }
@Test
public void testRunSkipIfLastRunLessThanPeriod()
@@ -79,7 +77,7 @@ public class KillDatasourceMetadataTest
@Test
public void testRunNotSkipIfLastRunMoreThanPeriod()
{
-
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
+
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats);
TestDruidCoordinatorConfig druidCoordinatorConfig = new
TestDruidCoordinatorConfig.Builder()
.withMetadataStoreManagementPeriod(new Duration("PT5S"))
@@ -88,10 +86,14 @@ public class KillDatasourceMetadataTest
.withCoordinatorKillMaxSegments(10)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
- killDatasourceMetadata = new
KillDatasourceMetadata(druidCoordinatorConfig,
mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
+ killDatasourceMetadata = new KillDatasourceMetadata(
+ druidCoordinatorConfig,
+ mockIndexerMetadataStorageCoordinator,
+ mockMetadataSupervisorManager
+ );
killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams);
Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(),
ArgumentMatchers.anySet());
-
Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+ Assert.assertTrue(runStats.hasStat(Stats.Kill.DATASOURCES));
}
@Test
@@ -104,9 +106,20 @@ public class KillDatasourceMetadataTest
.withCoordinatorKillMaxSegments(10)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Coordinator datasource metadata kill period must
be >= druid.coordinator.period.metadataStoreManagementPeriod");
- killDatasourceMetadata = new
KillDatasourceMetadata(druidCoordinatorConfig,
mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
+
+ final IllegalArgumentException exception = Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> killDatasourceMetadata = new KillDatasourceMetadata(
+ druidCoordinatorConfig,
+ mockIndexerMetadataStorageCoordinator,
+ mockMetadataSupervisorManager
+ )
+ );
+ Assert.assertEquals(
+ "[druid.coordinator.kill.datasource.period] must be greater than"
+ + " [druid.coordinator.period.metadataStoreManagementPeriod]",
+ exception.getMessage()
+ );
}
@Test
@@ -119,16 +132,23 @@ public class KillDatasourceMetadataTest
.withCoordinatorKillMaxSegments(10)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Coordinator datasource metadata kill
retainDuration must be >= 0");
- killDatasourceMetadata = new
KillDatasourceMetadata(druidCoordinatorConfig,
mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
+ final IllegalArgumentException exception = Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> killDatasourceMetadata = new KillDatasourceMetadata(
+ druidCoordinatorConfig,
+ mockIndexerMetadataStorageCoordinator,
+ mockMetadataSupervisorManager
+ )
+ );
+ Assert.assertEquals(
+ "[druid.coordinator.kill.datasource.durationToRetain] must be 0
milliseconds or higher",
+ exception.getMessage()
+ );
}
@Test
public void testRunWithEmptyFilterExcludedDatasource()
{
-
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
-
TestDruidCoordinatorConfig druidCoordinatorConfig = new
TestDruidCoordinatorConfig.Builder()
.withMetadataStoreManagementPeriod(new Duration("PT5S"))
.withCoordinatorDatasourceKillPeriod(new Duration("PT6S"))
@@ -139,6 +159,6 @@ public class KillDatasourceMetadataTest
killDatasourceMetadata = new
KillDatasourceMetadata(druidCoordinatorConfig,
mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams);
Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(),
ArgumentMatchers.eq(ImmutableSet.of()));
-
Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+ Assert.assertTrue(runStats.hasStat(Stats.Kill.DATASOURCES));
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java
index 6d98cfe0b5..f1663dffda 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java
@@ -19,16 +19,15 @@
package org.apache.druid.server.coordinator.duty;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
import org.joda.time.Duration;
+import org.junit.Assert;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
@@ -44,18 +43,14 @@ public class KillRulesTest
@Mock
private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
- @Mock
- private ServiceEmitter mockServiceEmitter;
-
- @Rule
- public ExpectedException exception = ExpectedException.none();
-
private KillRules killRules;
+ private CoordinatorRunStats runStats;
@Before
public void setup()
{
-
Mockito.when(mockDruidCoordinatorRuntimeParams.getDatabaseRuleManager()).thenReturn(mockRuleManager);
+ runStats = new CoordinatorRunStats();
+
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats);
}
@Test
@@ -68,7 +63,7 @@ public class KillRulesTest
.withCoordinatorKillMaxSegments(10)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
- killRules = new KillRules(druidCoordinatorConfig);
+ killRules = new KillRules(druidCoordinatorConfig, mockRuleManager);
killRules.run(mockDruidCoordinatorRuntimeParams);
Mockito.verifyNoInteractions(mockRuleManager);
}
@@ -76,7 +71,6 @@ public class KillRulesTest
@Test
public void testRunNotSkipIfLastRunMoreThanPeriod()
{
-
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
TestDruidCoordinatorConfig druidCoordinatorConfig = new
TestDruidCoordinatorConfig.Builder()
.withMetadataStoreManagementPeriod(new Duration("PT5S"))
.withCoordinatorRuleKillPeriod(new Duration("PT6S"))
@@ -84,10 +78,10 @@ public class KillRulesTest
.withCoordinatorKillMaxSegments(10)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
- killRules = new KillRules(druidCoordinatorConfig);
+ killRules = new KillRules(druidCoordinatorConfig, mockRuleManager);
killRules.run(mockDruidCoordinatorRuntimeParams);
Mockito.verify(mockRuleManager).removeRulesForEmptyDatasourcesOlderThan(ArgumentMatchers.anyLong());
-
Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+ Assert.assertTrue(runStats.hasStat(Stats.Kill.RULES));
}
@Test
@@ -100,9 +94,15 @@ public class KillRulesTest
.withCoordinatorKillMaxSegments(10)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("coordinator rule kill period must be >=
druid.coordinator.period.metadataStoreManagementPeriod");
- killRules = new KillRules(druidCoordinatorConfig);
+ final IllegalArgumentException exception = Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> killRules = new KillRules(druidCoordinatorConfig,
mockRuleManager)
+ );
+ Assert.assertEquals(
+ "[druid.coordinator.kill.rule.period] must be greater than"
+ + " [druid.coordinator.period.metadataStoreManagementPeriod]",
+ exception.getMessage()
+ );
}
@Test
@@ -115,8 +115,13 @@ public class KillRulesTest
.withCoordinatorKillMaxSegments(10)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("coordinator rule kill retainDuration must be >=
0");
- killRules = new KillRules(druidCoordinatorConfig);
+ final IllegalArgumentException exception = Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> killRules = new KillRules(druidCoordinatorConfig,
mockRuleManager)
+ );
+ Assert.assertEquals(
+ "[druid.coordinator.kill.rule.durationToRetain] must be 0 milliseconds
or higher",
+ exception.getMessage()
+ );
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDutyTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDutyTest.java
index a07c4f676a..e93e04f1b3 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDutyTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsCustomDutyTest.java
@@ -19,15 +19,15 @@
package org.apache.druid.server.coordinator.duty;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.metadata.MetadataSupervisorManager;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
import org.joda.time.Duration;
import org.junit.Assert;
-import org.junit.Rule;
+import org.junit.Before;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
@@ -44,43 +44,70 @@ public class KillSupervisorsCustomDutyTest
private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
@Mock
- private ServiceEmitter mockServiceEmitter;
-
- @Rule
- public ExpectedException exception = ExpectedException.none();
+ private DruidCoordinatorConfig coordinatorConfig;
private KillSupervisorsCustomDuty killSupervisors;
+ @Before
+ public void setup()
+ {
+
Mockito.when(coordinatorConfig.getCoordinatorMetadataStoreManagementPeriod())
+ .thenReturn(new Duration(3600 * 1000));
+ }
+
@Test
public void testConstructorFailIfRetainDurationNull()
{
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("(Custom Duty) Coordinator supervisor kill
retainDuration must be >= 0");
- killSupervisors = new KillSupervisorsCustomDuty(null,
mockMetadataSupervisorManager);
+ final IllegalArgumentException exception = Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> killSupervisors = new KillSupervisorsCustomDuty(null,
mockMetadataSupervisorManager, coordinatorConfig)
+ );
+ Assert.assertEquals(
+ "[KillSupervisorsCustomDuty.durationToRetain] must be 0 milliseconds
or higher",
+ exception.getMessage()
+ );
}
@Test
public void testConstructorFailIfRetainDurationInvalid()
{
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("(Custom Duty) Coordinator supervisor kill
retainDuration must be >= 0");
- killSupervisors = new KillSupervisorsCustomDuty(new Duration("PT-1s"),
mockMetadataSupervisorManager);
+ final IllegalArgumentException exception = Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> killSupervisors = new KillSupervisorsCustomDuty(
+ new Duration("PT-1S"),
+ mockMetadataSupervisorManager,
+ coordinatorConfig
+ )
+ );
+ Assert.assertEquals(
+ "[KillSupervisorsCustomDuty.durationToRetain] must be 0 milliseconds
or higher",
+ exception.getMessage()
+ );
}
@Test
public void testConstructorSuccess()
{
- killSupervisors = new KillSupervisorsCustomDuty(new Duration("PT1S"),
mockMetadataSupervisorManager);
+ killSupervisors = new KillSupervisorsCustomDuty(
+ new Duration("PT1S"),
+ mockMetadataSupervisorManager,
+ coordinatorConfig
+ );
Assert.assertNotNull(killSupervisors);
}
@Test
public void testRun()
{
-
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
- killSupervisors = new KillSupervisorsCustomDuty(new Duration("PT1S"),
mockMetadataSupervisorManager);
+ final CoordinatorRunStats runStats = new CoordinatorRunStats();
+
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats);
+ killSupervisors = new KillSupervisorsCustomDuty(
+ new Duration("PT1S"),
+ mockMetadataSupervisorManager,
+ coordinatorConfig
+ );
killSupervisors.run(mockDruidCoordinatorRuntimeParams);
Mockito.verify(mockMetadataSupervisorManager).removeTerminatedSupervisorsOlderThan(ArgumentMatchers.anyLong());
-
Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+ Assert.assertTrue(runStats.hasStat(Stats.Kill.SUPERVISOR_SPECS));
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java
index 0f7c2fff7e..bbe92a2cd1 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java
@@ -19,15 +19,15 @@
package org.apache.druid.server.coordinator.duty;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
import org.joda.time.Duration;
-import org.junit.Rule;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
@@ -43,13 +43,15 @@ public class KillSupervisorsTest
@Mock
private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
- @Mock
- private ServiceEmitter mockServiceEmitter;
-
- @Rule
- public ExpectedException exception = ExpectedException.none();
-
private KillSupervisors killSupervisors;
+ private CoordinatorRunStats runStats;
+
+ @Before
+ public void setup()
+ {
+ runStats = new CoordinatorRunStats();
+
Mockito.when(mockDruidCoordinatorRuntimeParams.getCoordinatorStats()).thenReturn(runStats);
+ }
@Test
public void testRunSkipIfLastRunLessThanPeriod()
@@ -69,7 +71,6 @@ public class KillSupervisorsTest
@Test
public void testRunNotSkipIfLastRunMoreThanPeriod()
{
-
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
TestDruidCoordinatorConfig druidCoordinatorConfig = new
TestDruidCoordinatorConfig.Builder()
.withMetadataStoreManagementPeriod(new Duration("PT5S"))
.withCoordinatorSupervisorKillPeriod(new Duration("PT6S"))
@@ -80,7 +81,7 @@ public class KillSupervisorsTest
killSupervisors = new KillSupervisors(druidCoordinatorConfig,
mockMetadataSupervisorManager);
killSupervisors.run(mockDruidCoordinatorRuntimeParams);
Mockito.verify(mockMetadataSupervisorManager).removeTerminatedSupervisorsOlderThan(ArgumentMatchers.anyLong());
-
Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+ Assert.assertTrue(runStats.hasStat(Stats.Kill.SUPERVISOR_SPECS));
}
@Test
@@ -93,9 +94,15 @@ public class KillSupervisorsTest
.withCoordinatorKillMaxSegments(10)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Coordinator supervisor kill period must be >=
druid.coordinator.period.metadataStoreManagementPeriod");
- killSupervisors = new KillSupervisors(druidCoordinatorConfig,
mockMetadataSupervisorManager);
+ final IllegalArgumentException exception = Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> killSupervisors = new KillSupervisors(druidCoordinatorConfig,
mockMetadataSupervisorManager)
+ );
+ Assert.assertEquals(
+ "[druid.coordinator.kill.supervisor.period] must be greater than"
+ + " [druid.coordinator.period.metadataStoreManagementPeriod]",
+ exception.getMessage()
+ );
}
@Test
@@ -108,8 +115,14 @@ public class KillSupervisorsTest
.withCoordinatorKillMaxSegments(10)
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Coordinator supervisor kill retainDuration must
be >= 0");
- killSupervisors = new KillSupervisors(druidCoordinatorConfig,
mockMetadataSupervisorManager);
+
+ final IllegalArgumentException exception = Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> killSupervisors = new KillSupervisors(druidCoordinatorConfig,
mockMetadataSupervisorManager)
+ );
+ Assert.assertEquals(
+ "[druid.coordinator.kill.supervisor.durationToRetain] must be 0
milliseconds or higher",
+ exception.getMessage()
+ );
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]