kfaraz commented on code in PR #14631:
URL: https://github.com/apache/druid/pull/14631#discussion_r1283944769


##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillCompactionConfig.java:
##########
@@ -72,99 +66,85 @@ public KillCompactionConfig(
       MetadataStorageTablesConfig connectorConfig
   )
   {
+    super(
+        "compaction configs",
+        "druid.coordinator.kill.compaction",
+        config.getCoordinatorCompactionKillPeriod(),
+        Duration.millis(1), // Retain duration is ignored
+        Stats.Kill.COMPACTION_CONFIGS,
+        config
+    );
     this.sqlSegmentsMetadataManager = sqlSegmentsMetadataManager;
     this.jacksonConfigManager = jacksonConfigManager;
-    this.period = config.getCoordinatorCompactionKillPeriod().getMillis();
     this.connector = connector;
     this.connectorConfig = connectorConfig;
-    Preconditions.checkArgument(
-        this.period >= 
config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
-        "Coordinator compaction configuration kill period must be >= 
druid.coordinator.period.metadataStoreManagementPeriod"
-    );
-    log.debug(
-        "Compaction Configuration Kill Task scheduling enabled with period 
[%s]",
-        this.period
-    );
   }
 
   @Override
-  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
+  protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime)
   {
-    long currentTimeMillis = System.currentTimeMillis();
-    if ((lastKillTime + period) < currentTimeMillis) {
-      lastKillTime = currentTimeMillis;
-      try {
-        RetryUtils.retry(
-            () -> {
-              final byte[] currentBytes = 
CoordinatorCompactionConfig.getConfigInByteFromDb(connector, connectorConfig);
-              final CoordinatorCompactionConfig current = 
CoordinatorCompactionConfig.convertByteToConfig(jacksonConfigManager, 
currentBytes);
-              // If current compaction config is empty then there is nothing 
to do
-              if (CoordinatorCompactionConfig.empty().equals(current)) {
-                log.info(
-                    "Finished running KillCompactionConfig duty. Nothing to do 
as compaction config is already empty.");
-                emitMetric(params.getEmitter(), 0);
-                return ConfigManager.SetResult.ok();
-              }
-
-              // Get all active datasources
-              // Note that we get all active datasources after getting 
compaction config to prevent race condition if new
-              // datasource and config are added.
-              Set<String> activeDatasources = 
sqlSegmentsMetadataManager.retrieveAllDataSourceNames();
-              final Map<String, DataSourceCompactionConfig> updated = current
-                  .getCompactionConfigs()
-                  .stream()
-                  .filter(dataSourceCompactionConfig -> 
activeDatasources.contains(dataSourceCompactionConfig.getDataSource()))
-                  
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, 
Function.identity()));
-
-              // Calculate number of compaction configs to remove for logging
-              int compactionConfigRemoved = 
current.getCompactionConfigs().size() - updated.size();
-
-              ConfigManager.SetResult result = jacksonConfigManager.set(
-                  CoordinatorCompactionConfig.CONFIG_KEY,
-                  currentBytes,
-                  CoordinatorCompactionConfig.from(current, 
ImmutableList.copyOf(updated.values())),
-                  new AuditInfo(
-                      "KillCompactionConfig",
-                      "CoordinatorDuty for automatic deletion of compaction 
config",
-                      ""
-                  )
-              );
-              if (result.isOk()) {
-                log.info(
-                    "Finished running KillCompactionConfig duty. Removed %,d 
compaction configs",
-                    compactionConfigRemoved
-                );
-                emitMetric(params.getEmitter(), compactionConfigRemoved);
-              } else if (result.isRetryable()) {
-                // Failed but is retryable
-                log.debug("Retrying KillCompactionConfig duty");
-                throw new RetryableException(result.getException());
-              } else {
-                // Failed and not retryable
-                log.error(result.getException(), "Failed to kill compaction 
configurations");
-                emitMetric(params.getEmitter(), 0);

Review Comment:
   Yes, all metrics collected in any coordinator duty are emitted in 
`DruidCoordinator.DutiesRunnable.emitStat`. For these duties, the metrics are 
being collected in `MetadataCleanupDuty.run()` using 
`params.getCoordinatorStats().add`.
   
   One of the motivations of this PR was to collect metrics everywhere and emit 
it from just the one place in `DruidCoordinator`. Other coordinator duties such 
as `RunRules` and `BalanceSegments` are already doing this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to