This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d2b5cd  Add feature to automatically remove audit logs based on 
retention period (#11084)
6d2b5cd is described below

commit 6d2b5cdd7e080401bce64d352e3a65788ff110c4
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Tue Apr 20 17:10:43 2021 -0700

    Add feature to automatically remove audit logs based on retention period 
(#11084)
    
    * add docs
    
    * add impl
    
    * fix checkstyle
    
    * fix test
    
    * add test
    
    * fix checkstyle
    
    * fix checkstyle
    
    * fix test
    
    * Address comments
    
    * Address comments
    
    * fix spelling
    
    * fix docs
---
 .../java/org/apache/druid/audit/AuditManager.java  |   8 ++
 docs/configuration/index.md                        |   9 ++
 docs/operations/metrics.md                         |   2 +
 .../CoordinatorMetadataStoreManagementDuty.java    |  36 ++++++
 .../apache/druid/server/audit/SQLAuditManager.java |  19 +++
 .../druid/server/coordinator/DruidCoordinator.java |  26 ++++
 .../server/coordinator/DruidCoordinatorConfig.java |  12 ++
 .../server/coordinator/duty/KillAuditLog.java      |  81 ++++++++++++
 .../druid/server/audit/SQLAuditManagerTest.java    |  95 ++++++++++++--
 .../coordinator/CuratorDruidCoordinatorTest.java   |   5 +
 .../server/coordinator/DruidCoordinatorTest.java   |   5 +
 .../server/coordinator/HttpLoadQueuePeonTest.java  |   3 +
 .../server/coordinator/LoadQueuePeonTest.java      |   9 ++
 .../server/coordinator/LoadQueuePeonTester.java    |   3 +
 .../coordinator/TestDruidCoordinatorConfig.java    |  27 ++++
 .../server/coordinator/duty/KillAuditLogTest.java  | 139 +++++++++++++++++++++
 .../coordinator/duty/KillUnusedSegmentsTest.java   |   3 +
 .../java/org/apache/druid/cli/CliCoordinator.java  |  26 +++-
 website/.spelling                                  |   1 +
 19 files changed, 494 insertions(+), 15 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/audit/AuditManager.java 
b/core/src/main/java/org/apache/druid/audit/AuditManager.java
index 73804d7..f403423 100644
--- a/core/src/main/java/org/apache/druid/audit/AuditManager.java
+++ b/core/src/main/java/org/apache/druid/audit/AuditManager.java
@@ -91,4 +91,12 @@ public interface AuditManager
    * @return list of AuditEntries satisfying the passed parameters
    */
   List<AuditEntry> fetchAuditHistory(String type, int limit);
+
+  /**
+   * Remove audit logs created older than the given timestamp.
+   *
+   * @param timestamp timestamp in milliseconds
+   * @return number of audit logs removed
+   */
+  int removeAuditLogsOlderThan(long timestamp);
 }
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index cd26a9e..d49bf75 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -742,6 +742,15 @@ These Coordinator static configurations can be defined in 
the `coordinator/runti
 |`druid.coordinator.asOverlord.enabled`|Boolean value for whether this 
Coordinator process should act like an Overlord as well. This configuration 
allows users to simplify a druid cluster by not having to deploy any standalone 
Overlord processes. If set to true, then Overlord console is available at 
`http://coordinator-host:port/console.html` and be sure to set 
`druid.coordinator.asOverlord.overlordService` also. See next.|false|
 |`druid.coordinator.asOverlord.overlordService`| Required, if 
`druid.coordinator.asOverlord.enabled` is `true`. This must be same value as 
`druid.service` on standalone Overlord processes and 
`druid.selectors.indexing.serviceName` on Middle Managers.|NULL|
 
+##### Metadata Management
+
+|Property|Description|Required?|Default|
+|--------|-----------|---------|-------|
+|`druid.coordinator.period.metadataStoreManagementPeriod`|How often to run 
metadata management tasks in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) 
duration format. |No | `PT1H`|
+|`druid.coordinator.kill.audit.on`| Boolean value for whether to enable 
automatic deletion of audit logs. If set to true, Coordinator will periodically 
remove audit logs from the audit table entries in metadata storage.| No | 
False| 
+|`druid.coordinator.kill.audit.period`| How often to do automatic deletion of 
audit logs in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration 
format. Value must be greater than 
`druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if 
`druid.coordinator.kill.audit.on` is set to True.| No| `P1D`|
+|`druid.coordinator.kill.audit.durationToRetain`| Duration of audit logs to be 
retained from created time in [ISO 
8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if 
`druid.coordinator.kill.audit.on` is set to True.| Yes if 
`druid.coordinator.kill.audit.on` is set to True| None|
+
 ##### Segment Management
 |Property|Possible Values|Description|Default|
 |--------|---------------|-----------|-------|
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 9a1d2c7..5a2d5fb 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -256,6 +256,8 @@ These metrics are for the Druid Coordinator and are reset 
each time the Coordina
 |`interval/skipCompact/count`|Total number of intervals of this datasource 
that are skipped (not eligible for auto compaction) by the auto 
compaction.|datasource.|Varies.|
 |`coordinator/time`|Approximate Coordinator duty runtime in milliseconds. The 
duty dimension is the string alias of the Duty that is being run.|duty.|Varies.|
 |`coordinator/global/time`|Approximate runtime of a full coordination cycle in 
milliseconds. The `dutyGroup` dimension indicates what type of coordination 
this run was. i.e. Historical Management vs Indexing|`dutyGroup`|Varies.|
+|`metadata/kill/audit/count`|Total number of audit logs automatically deleted 
from metadata store audit table per each Coordinator kill audit duty run. This 
metric can help adjust `druid.coordinator.kill.audit.durationToRetain` 
configuration based on if more or less audit logs need to be deleted per cycle. 
Note that this metric is only emitted when `druid.coordinator.kill.audit.on` is 
set to true.| |Varies.|
+
 
 If `emitBalancingStats` is set to `true` in the Coordinator [dynamic 
configuration](../configuration/index.md#dynamic-configuration), then [log 
entries](../configuration/logging.md) for class
 `org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics` will 
have extra information on balancing
diff --git 
a/server/src/main/java/org/apache/druid/guice/annotations/CoordinatorMetadataStoreManagementDuty.java
 
b/server/src/main/java/org/apache/druid/guice/annotations/CoordinatorMetadataStoreManagementDuty.java
new file mode 100644
index 0000000..47cde2b
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/guice/annotations/CoordinatorMetadataStoreManagementDuty.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice.annotations;
+
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ */
+@BindingAnnotation
+@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface CoordinatorMetadataStoreManagementDuty
+{
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java 
b/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java
index 1302043..91a5a21 100644
--- a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java
+++ b/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java
@@ -41,6 +41,7 @@ import org.joda.time.Interval;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.IDBI;
 import org.skife.jdbi.v2.Query;
+import org.skife.jdbi.v2.Update;
 import org.skife.jdbi.v2.tweak.HandleCallback;
 
 import java.io.IOException;
@@ -229,6 +230,24 @@ public class SQLAuditManager implements AuditManager
     return fetchAuditHistoryLastEntries(null, type, limit);
   }
 
+  @Override
+  public int removeAuditLogsOlderThan(final long timestamp)
+  {
+    DateTime dateTime = DateTimes.utc(timestamp);
+    return dbi.withHandle(
+        handle -> {
+          Update sql = handle.createStatement(
+              StringUtils.format(
+                  "DELETE FROM %s WHERE created_date < :date_time",
+                  getAuditTable()
+              )
+          );
+          return sql.bind("date_time", dateTime.toString())
+                    .execute();
+        }
+    );
+  }
+
   private List<AuditEntry> fetchAuditHistoryLastEntries(final String key, 
final String type, int limit)
       throws IllegalArgumentException
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 17a3794..7da4fe6 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -47,6 +47,7 @@ import org.apache.druid.curator.discovery.ServiceAnnouncer;
 import org.apache.druid.discovery.DruidLeaderSelector;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty;
+import 
org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
@@ -150,6 +151,7 @@ public class DruidCoordinator
   private final ServiceAnnouncer serviceAnnouncer;
   private final DruidNode self;
   private final Set<CoordinatorDuty> indexingServiceDuties;
+  private final Set<CoordinatorDuty> metadataStoreManagementDuties;
   private final BalancerStrategyFactory factory;
   private final LookupCoordinatorManager lookupCoordinatorManager;
   private final DruidLeaderSelector coordLeaderSelector;
@@ -164,6 +166,7 @@ public class DruidCoordinator
   private ListeningExecutorService balancerExec;
 
   private static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = 
"HistoricalManagementDuties";
+  private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = 
"MetadataStoreManagementDuties";
   private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = 
"IndexingServiceDuties";
   private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP = 
"CompactSegmentsDuties";
 
@@ -182,6 +185,7 @@ public class DruidCoordinator
       LoadQueueTaskMaster taskMaster,
       ServiceAnnouncer serviceAnnouncer,
       @Self DruidNode self,
+      @CoordinatorMetadataStoreManagementDuty Set<CoordinatorDuty> 
metadataStoreManagementDuties,
       @CoordinatorIndexingServiceDuty Set<CoordinatorDuty> 
indexingServiceDuties,
       BalancerStrategyFactory factory,
       LookupCoordinatorManager lookupCoordinatorManager,
@@ -206,6 +210,7 @@ public class DruidCoordinator
         self,
         new ConcurrentHashMap<>(),
         indexingServiceDuties,
+        metadataStoreManagementDuties,
         factory,
         lookupCoordinatorManager,
         coordLeaderSelector,
@@ -230,6 +235,7 @@ public class DruidCoordinator
       DruidNode self,
       ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap,
       Set<CoordinatorDuty> indexingServiceDuties,
+      Set<CoordinatorDuty> metadataStoreManagementDuties,
       BalancerStrategyFactory factory,
       LookupCoordinatorManager lookupCoordinatorManager,
       DruidLeaderSelector coordLeaderSelector,
@@ -255,6 +261,7 @@ public class DruidCoordinator
     this.serviceAnnouncer = serviceAnnouncer;
     this.self = self;
     this.indexingServiceDuties = indexingServiceDuties;
+    this.metadataStoreManagementDuties = metadataStoreManagementDuties;
 
     this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
 
@@ -665,6 +672,12 @@ public class DruidCoordinator
             )
         );
       }
+      dutiesRunnables.add(
+          Pair.of(
+              new DutiesRunnable(makeMetadataStoreManagementDuties(), 
startingLeaderCounter, METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP),
+              config.getCoordinatorMetadataStoreManagementPeriod()
+          )
+      );
 
       for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable : 
dutiesRunnables) {
         // CompactSegmentsDuty can takes a non trival amount of time to 
complete.
@@ -750,6 +763,19 @@ public class DruidCoordinator
     return ImmutableList.copyOf(duties);
   }
 
+  private List<CoordinatorDuty> makeMetadataStoreManagementDuties()
+  {
+    List<CoordinatorDuty> duties = ImmutableList.<CoordinatorDuty>builder()
+                                                
.addAll(metadataStoreManagementDuties)
+                                                .build();
+
+    log.debug(
+        "Done making metadata store management duties %s",
+        duties.stream().map(duty -> 
duty.getClass().getName()).collect(Collectors.toList())
+    );
+    return ImmutableList.copyOf(duties);
+  }
+
   private List<CoordinatorDuty> makeCompactSegmentsDuty()
   {
     return ImmutableList.of(compactSegments);
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
index 130d7e8..933b974 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
@@ -39,6 +39,10 @@ public abstract class DruidCoordinatorConfig
   @Default("PT1800s")
   public abstract Duration getCoordinatorIndexingPeriod();
 
+  @Config("druid.coordinator.period.metadataStoreManagementPeriod")
+  @Default("PT1H")
+  public abstract Duration getCoordinatorMetadataStoreManagementPeriod();
+
   @Config("druid.coordinator.kill.period")
   @Default("P1D")
   public abstract Duration getCoordinatorKillPeriod();
@@ -51,6 +55,14 @@ public abstract class DruidCoordinatorConfig
   @Default("0")
   public abstract int getCoordinatorKillMaxSegments();
 
+  @Config("druid.coordinator.kill.audit.period")
+  @Default("P1D")
+  public abstract Duration getCoordinatorAuditKillPeriod();
+
+  @Config("druid.coordinator.kill.audit.durationToRetain")
+  @Default("PT-1s")
+  public abstract Duration getCoordinatorAuditKillDurationToRetain();
+
   @Config("druid.coordinator.load.timeout")
   public Duration getLoadTimeoutDelay()
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
new file mode 100644
index 0000000..651cf6b
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import org.apache.druid.audit.AuditManager;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+
+public class KillAuditLog implements CoordinatorDuty
+{
+  private static final Logger log = new Logger(KillAuditLog.class);
+
+  private final long period;
+  private final long retainDuration;
+  private long lastKillTime = 0;
+
+  private final AuditManager auditManager;
+
+  @Inject
+  public KillAuditLog(
+      AuditManager auditManager,
+      DruidCoordinatorConfig config
+  )
+  {
+    this.period = config.getCoordinatorAuditKillPeriod().getMillis();
+    Preconditions.checkArgument(
+        this.period >= 
config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
+        "coordinator audit kill period must be >= 
druid.coordinator.period.metadataStoreManagementPeriod"
+    );
+    this.retainDuration = 
config.getCoordinatorAuditKillDurationToRetain().getMillis();
+    Preconditions.checkArgument(this.retainDuration >= 0, "coordinator audit 
kill retainDuration must be >= 0");
+    log.debug(
+        "Audit Kill Task scheduling enabled with period [%s], retainDuration 
[%s]",
+        this.period,
+        this.retainDuration
+    );
+    this.auditManager = auditManager;
+  }
+
+  @Override
+  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
+  {
+    if ((lastKillTime + period) < System.currentTimeMillis()) {
+      lastKillTime = System.currentTimeMillis();
+
+      long timestamp = System.currentTimeMillis() - retainDuration;
+      int auditRemoved = auditManager.removeAuditLogsOlderThan(timestamp);
+      ServiceEmitter emitter = params.getEmitter();
+      emitter.emit(
+          new ServiceMetricEvent.Builder().build(
+              "metadata/kill/audit/count",
+              auditRemoved
+          )
+      );
+      log.info("Finished running KillAuditLog duty. Removed %,d audit logs", 
auditRemoved);
+    }
+    return params;
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java 
b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
index 429402e..d336e84 100644
--- 
a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
@@ -128,15 +128,15 @@ public class SQLAuditManagerTest
   public void testAuditMetricEventBuilderConfig()
   {
     AuditEntry entry = new AuditEntry(
-            "testKey",
-            "testType",
-            new AuditInfo(
-                    "testAuthor",
-                    "testComment",
-                    "127.0.0.1"
-            ),
-            "testPayload",
-            DateTimes.of("2013-01-01T00:00:00Z")
+        "testKey",
+        "testType",
+        new AuditInfo(
+            "testAuthor",
+            "testComment",
+            "127.0.0.1"
+        ),
+        "testPayload",
+        DateTimes.of("2013-01-01T00:00:00Z")
     );
 
     SQLAuditManager auditManagerWithPayloadAsDimension = new SQLAuditManager(
@@ -257,6 +257,83 @@ public class SQLAuditManagerTest
   }
 
   @Test(timeout = 60_000L)
+  public void testRemoveAuditLogsOlderThanWithEntryOlderThanTime() throws 
IOException
+  {
+    String entry1Key = "testKey";
+    String entry1Type = "testType";
+    AuditInfo entry1AuditInfo = new AuditInfo(
+        "testAuthor",
+        "testComment",
+        "127.0.0.1"
+    );
+    String entry1Payload = "testPayload";
+
+    auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, 
entry1Payload, stringConfigSerde);
+    byte[] payload = connector.lookup(
+        
derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
+        "audit_key",
+        "payload",
+        "testKey"
+    );
+    AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class);
+    Assert.assertEquals(entry1Key, dbEntry.getKey());
+    Assert.assertEquals(entry1Payload, dbEntry.getPayload());
+    Assert.assertEquals(entry1Type, dbEntry.getType());
+    Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
+
+    // Do delete
+    auditManager.removeAuditLogsOlderThan(System.currentTimeMillis());
+    // Verify the delete
+    payload = connector.lookup(
+        
derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
+        "audit_key",
+        "payload",
+        "testKey"
+    );
+    Assert.assertNull(payload);
+  }
+
+  @Test(timeout = 60_000L)
+  public void testRemoveAuditLogsOlderThanWithEntryNotOlderThanTime() throws 
IOException
+  {
+    String entry1Key = "testKey";
+    String entry1Type = "testType";
+    AuditInfo entry1AuditInfo = new AuditInfo(
+        "testAuthor",
+        "testComment",
+        "127.0.0.1"
+    );
+    String entry1Payload = "testPayload";
+
+    auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, 
entry1Payload, stringConfigSerde);
+    byte[] payload = connector.lookup(
+        
derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
+        "audit_key",
+        "payload",
+        "testKey"
+    );
+    AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class);
+    Assert.assertEquals(entry1Key, dbEntry.getKey());
+    Assert.assertEquals(entry1Payload, dbEntry.getPayload());
+    Assert.assertEquals(entry1Type, dbEntry.getType());
+    Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
+    // Do delete
+    
auditManager.removeAuditLogsOlderThan(DateTimes.of("2012-01-01T00:00:00Z").getMillis());
+    // Verify that entry was not delete
+    payload = connector.lookup(
+        
derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
+        "audit_key",
+        "payload",
+        "testKey"
+    );
+    dbEntry = mapper.readValue(payload, AuditEntry.class);
+    Assert.assertEquals(entry1Key, dbEntry.getKey());
+    Assert.assertEquals(entry1Payload, dbEntry.getPayload());
+    Assert.assertEquals(entry1Type, dbEntry.getType());
+    Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
+  }
+
+  @Test(timeout = 60_000L)
   public void testFetchAuditHistoryByTypeWithLimit()
   {
     String entry1Key = "testKey";
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index dd382e3..a8deb66 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -169,8 +169,11 @@ public class CuratorDruidCoordinatorTest extends 
CuratorTestBase
         new Duration(COORDINATOR_PERIOD),
         null,
         null,
+        null,
         new Duration(COORDINATOR_PERIOD),
         null,
+        null,
+        null,
         10,
         new Duration("PT0s")
     );
@@ -247,6 +250,7 @@ public class CuratorDruidCoordinatorTest extends 
CuratorTestBase
         druidNode,
         loadManagementPeons,
         null,
+        null,
         new CostBalancerStrategyFactory(),
         EasyMock.createNiceMock(LookupCoordinatorManager.class),
         new TestDruidLeaderSelector(),
@@ -546,6 +550,7 @@ public class CuratorDruidCoordinatorTest extends 
CuratorTestBase
         druidNode,
         loadManagementPeons,
         null,
+        null,
         new CostBalancerStrategyFactory(),
         EasyMock.createNiceMock(LookupCoordinatorManager.class),
         new TestDruidLeaderSelector(),
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 36f1bf6..3aaef64 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -141,8 +141,11 @@ public class DruidCoordinatorTest extends CuratorTestBase
         new Duration(COORDINATOR_PERIOD),
         null,
         null,
+        null,
         new Duration(COORDINATOR_PERIOD),
         null,
+        null,
+        null,
         10,
         new Duration("PT0s")
     );
@@ -212,6 +215,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
         druidNode,
         loadManagementPeons,
         null,
+        new HashSet<>(),
         new CostBalancerStrategyFactory(),
         EasyMock.createNiceMock(LookupCoordinatorManager.class),
         new TestDruidLeaderSelector(),
@@ -738,6 +742,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
         null,
         null,
         null,
+        null,
         ZkEnablementConfig.ENABLED
     );
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
index 10d7ba2..7b07d7f 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
@@ -80,6 +80,9 @@ public class HttpLoadQueuePeonTest
       null,
       null,
       null,
+      null,
+      null,
+      null,
       10,
       Duration.ZERO
   )
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
index 4a2a25f..24e6806 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
@@ -95,6 +95,9 @@ public class LoadQueuePeonTest extends CuratorTestBase
             null,
             null,
             null,
+            null,
+            null,
+            null,
             10,
             Duration.millis(0)
         )
@@ -287,9 +290,12 @@ public class LoadQueuePeonTest extends CuratorTestBase
             null,
             null,
             null,
+            null,
             new Duration(1),
             null,
             null,
+            null,
+            null,
             10,
             new Duration("PT1s")
         )
@@ -339,9 +345,12 @@ public class LoadQueuePeonTest extends CuratorTestBase
             null,
             null,
             null,
+            null,
             new Duration(1),
             null,
             null,
+            null,
+            null,
             10,
             new Duration("PT1s")
         )
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
index fb92873..5185452 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
@@ -41,9 +41,12 @@ public class LoadQueuePeonTester extends CuratorLoadQueuePeon
             null,
             null,
             null,
+            null,
             new Duration(1),
             null,
             null,
+            null,
+            null,
             10,
             new Duration("PT1s")
         )
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
index fef244a..135f8d0 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
@@ -27,9 +27,12 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
   private final Duration coordinatorStartDelay;
   private final Duration coordinatorPeriod;
   private final Duration coordinatorIndexingPeriod;
+  private final Duration metadataStoreManagementPeriod;
   private final Duration loadTimeoutDelay;
   private final Duration coordinatorKillPeriod;
   private final Duration coordinatorKillDurationToRetain;
+  private final Duration coordinatorAuditKillPeriod;
+  private final Duration coordinatorAuditKillDurationToRetain;
   private final Duration getLoadQueuePeonRepeatDelay;
   private final int coordinatorKillMaxSegments;
 
@@ -37,9 +40,12 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
       Duration coordinatorStartDelay,
       Duration coordinatorPeriod,
       Duration coordinatorIndexingPeriod,
+      Duration metadataStoreManagementPeriod,
       Duration loadTimeoutDelay,
       Duration coordinatorKillPeriod,
       Duration coordinatorKillDurationToRetain,
+      Duration coordinatorAuditKillPeriod,
+      Duration coordinatorAuditKillDurationToRetain,
       int coordinatorKillMaxSegments,
       Duration getLoadQueuePeonRepeatDelay
   )
@@ -47,9 +53,12 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
     this.coordinatorStartDelay = coordinatorStartDelay;
     this.coordinatorPeriod = coordinatorPeriod;
     this.coordinatorIndexingPeriod = coordinatorIndexingPeriod;
+    this.metadataStoreManagementPeriod = metadataStoreManagementPeriod;
     this.loadTimeoutDelay = loadTimeoutDelay;
     this.coordinatorKillPeriod = coordinatorKillPeriod;
     this.coordinatorKillDurationToRetain = coordinatorKillDurationToRetain;
+    this.coordinatorAuditKillPeriod = coordinatorAuditKillPeriod;
+    this.coordinatorAuditKillDurationToRetain = 
coordinatorAuditKillDurationToRetain;
     this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
     this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay;
   }
@@ -73,6 +82,12 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
   }
 
   @Override
+  public Duration getCoordinatorMetadataStoreManagementPeriod()
+  {
+    return metadataStoreManagementPeriod;
+  }
+
+  @Override
   public Duration getCoordinatorKillPeriod()
   {
     return coordinatorKillPeriod;
@@ -85,6 +100,18 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
   }
 
   @Override
+  public Duration getCoordinatorAuditKillPeriod()
+  {
+    return coordinatorAuditKillPeriod;
+  }
+
+  @Override
+  public Duration getCoordinatorAuditKillDurationToRetain()
+  {
+    return coordinatorAuditKillDurationToRetain;
+  }
+
+  @Override
   public int getCoordinatorKillMaxSegments()
   {
     return coordinatorKillMaxSegments;
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
new file mode 100644
index 0000000..b0f273c
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import org.apache.druid.audit.AuditManager;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KillAuditLogTest
+{
+  @Mock
+  private AuditManager mockAuditManager;
+
+  @Mock
+  private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams;
+
+  @Mock
+  private ServiceEmitter mockServiceEmitter;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  private KillAuditLog killAuditLog;
+
+  @Test
+  public void testRunSkipIfLastRunLessThanPeriod()
+  {
+    TestDruidCoordinatorConfig druidCoordinatorConfig = new 
TestDruidCoordinatorConfig(
+        null,
+        null,
+        null,
+        new Duration("PT5S"),
+        null,
+        null,
+        null,
+        new Duration(Long.MAX_VALUE),
+        new Duration("PT1S"),
+        10,
+        null
+    );
+    killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
+    killAuditLog.run(mockDruidCoordinatorRuntimeParams);
+    Mockito.verifyZeroInteractions(mockAuditManager);
+  }
+
+  @Test
+  public void testRunNotSkipIfLastRunMoreThanPeriod()
+  {
+    
Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter);
+    TestDruidCoordinatorConfig druidCoordinatorConfig = new 
TestDruidCoordinatorConfig(
+        null,
+        null,
+        null,
+        new Duration("PT5S"),
+        null,
+        null,
+        null,
+        new Duration("PT6S"),
+        new Duration("PT1S"),
+        10,
+        null
+    );
+    killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
+    killAuditLog.run(mockDruidCoordinatorRuntimeParams);
+    
Mockito.verify(mockAuditManager).removeAuditLogsOlderThan(ArgumentMatchers.anyLong());
+    
Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+  }
+
+  @Test
+  public void testConstructorFailIfInvalidPeriod()
+  {
+    TestDruidCoordinatorConfig druidCoordinatorConfig = new 
TestDruidCoordinatorConfig(
+        null,
+        null,
+        null,
+        new Duration("PT5S"),
+        null,
+        null,
+        null,
+        new Duration("PT3S"),
+        new Duration("PT1S"),
+        10,
+        null
+    );
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("coordinator audit kill period must be >= 
druid.coordinator.period.metadataStoreManagementPeriod");
+    killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
+  }
+
+  @Test
+  public void testConstructorFailIfInvalidRetainDuration()
+  {
+    TestDruidCoordinatorConfig druidCoordinatorConfig = new 
TestDruidCoordinatorConfig(
+        null,
+        null,
+        null,
+        new Duration("PT5S"),
+        null,
+        null,
+        null,
+        new Duration("PT6S"),
+        new Duration("PT-1S"),
+        10,
+        null
+    );
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("coordinator audit kill retainDuration must be >= 
0");
+    killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index 38aa78e..cd76666 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -105,9 +105,12 @@ public class KillUnusedSegmentsTest
             null,
             null,
             Duration.parse("PT76400S"),
+            null,
             new Duration(1),
             Duration.parse("PT86400S"),
             Duration.parse("PT86400S"),
+            null,
+            null,
             1000,
             Duration.ZERO
         )
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java 
b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 1876a28..9e3c65b 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -46,6 +46,7 @@ import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty;
+import 
org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty;
 import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.guice.http.JettyHttpClientModule;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -71,6 +72,7 @@ import 
org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.KillStalePendingSegments;
 import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
 import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
+import org.apache.druid.server.coordinator.duty.KillAuditLog;
 import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
 import org.apache.druid.server.http.ClusterResource;
 import org.apache.druid.server.http.CompactionResource;
@@ -214,14 +216,14 @@ public class CliCoordinator extends ServerRunnable
             LifecycleModule.register(binder, Server.class);
             LifecycleModule.register(binder, DataSourcesResource.class);
 
-            final ConditionalMultibind<CoordinatorDuty> conditionalMultibind = 
ConditionalMultibind.create(
+            // Binding for Set of indexing service coordinator Ddty
+            final ConditionalMultibind<CoordinatorDuty> 
conditionalIndexingServiceDutyMultibind = ConditionalMultibind.create(
                 properties,
                 binder,
                 CoordinatorDuty.class,
                 CoordinatorIndexingServiceDuty.class
             );
-
-            if 
(conditionalMultibind.matchCondition("druid.coordinator.merge.on", 
Predicates.equalTo("true"))) {
+            if 
(conditionalIndexingServiceDutyMultibind.matchCondition("druid.coordinator.merge.on",
 Predicates.equalTo("true"))) {
               throw new UnsupportedOperationException(
                   "'druid.coordinator.merge.on' is not supported anymore. "
                   + "Please consider using Coordinator's automatic compaction 
instead. "
@@ -230,19 +232,31 @@ public class CliCoordinator extends ServerRunnable
                   + "for more details about compaction."
               );
             }
-
-            conditionalMultibind.addConditionBinding(
+            conditionalIndexingServiceDutyMultibind.addConditionBinding(
                 "druid.coordinator.kill.on",
                 Predicates.equalTo("true"),
                 KillUnusedSegments.class
             );
-            conditionalMultibind.addConditionBinding(
+            conditionalIndexingServiceDutyMultibind.addConditionBinding(
                 "druid.coordinator.kill.pendingSegments.on",
                 "true",
                 Predicates.equalTo("true"),
                 KillStalePendingSegments.class
             );
 
+            // Binding for Set of metadata store management coordinator Ddty
+            final ConditionalMultibind<CoordinatorDuty> 
conditionalMetadataStoreManagementDutyMultibind = ConditionalMultibind.create(
+                properties,
+                binder,
+                CoordinatorDuty.class,
+                CoordinatorMetadataStoreManagementDuty.class
+            );
+            
conditionalMetadataStoreManagementDutyMultibind.addConditionBinding(
+                "druid.coordinator.kill.audit.on",
+                Predicates.equalTo("true"),
+                KillAuditLog.class
+            );
+
             bindNodeRoleAndAnnouncer(
                 binder,
                 Coordinator.class,
diff --git a/website/.spelling b/website/.spelling
index bd2ecfa..315e5ff 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1683,6 +1683,7 @@ PT1S
 PT24H
 PT300S
 PT30S
+PT3600S
 PT5M
 PT5S
 PT60S

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to