This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e2eded  Allow coordinator run auto compaction duty period to be 
configured separately from other indexing duties (#12263)
6e2eded is described below

commit 6e2eded277bec907466ce0b7823ce70fae6e388e
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Fri Feb 18 23:02:57 2022 -0800

    Allow coordinator run auto compaction duty period to be configured 
separately from other indexing duties (#12263)
    
    * add impl
    
    * add impl
    
    * add unit tests
    
    * add impl
    
    * add impl
    
    * add serde test
    
    * add tests
    
    * add docs
    
    * fix test
    
    * fix test
    
    * fix docs
    
    * fix docs
    
    * fix spelling
---
 docs/design/coordinator.md                         |   7 +
 .../druid/server/coordinator/DruidCoordinator.java |  47 ++++-
 .../server/coordinator/duty/CompactSegments.java   |  23 ++-
 .../coordinator/duty/CoordinatorCustomDuty.java    |   1 +
 .../server/coordinator/DruidCoordinatorTest.java   | 226 ++++++++++++++++++++-
 .../coordinator/TestDruidCoordinatorConfig.java    |  52 ++++-
 .../coordinator/duty/CompactSegmentsTest.java      |  24 +++
 7 files changed, 365 insertions(+), 15 deletions(-)

diff --git a/docs/design/coordinator.md b/docs/design/coordinator.md
index d5769a9..a3d33ec 100644
--- a/docs/design/coordinator.md
+++ b/docs/design/coordinator.md
@@ -98,6 +98,13 @@ Compaction tasks might fail due to the following reasons.
 
 Once a compaction task fails, the Coordinator simply checks the segments in 
the interval of the failed task again, and issues another compaction task in 
the next run.
 
+Note that Compacting Segments Coordinator Duty is automatically enabled and 
run as part of the Indexing Service Duties group. However, Compacting Segments 
Coordinator Duty can be configured to run in isolation as a separate 
coordinator duty group. This allows changing the period of Compacting Segments 
Coordinator Duty without impacting the period of other Indexing Service Duties. 
This can be done by setting the following properties (for more details see 
[custom pluggable Coordinator Duty [...]
+```
+druid.coordinator.dutyGroups=[<SOME_GROUP_NAME>]
+druid.coordinator.<SOME_GROUP_NAME>.duties=["compactSegments"]
+druid.coordinator.<SOME_GROUP_NAME>.period=<PERIOD_TO_RUN_COMPACTING_SEGMENTS_DUTY>
+```
+
 ### Segment search policy
 
 #### Recent segment first policy
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index c901cb9..8c060ee 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.server.coordinator;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Ordering;
@@ -158,7 +159,7 @@ public class DruidCoordinator
   private final BalancerStrategyFactory factory;
   private final LookupCoordinatorManager lookupCoordinatorManager;
   private final DruidLeaderSelector coordLeaderSelector;
-
+  private final ObjectMapper objectMapper;
   private final CompactSegments compactSegments;
 
   private volatile boolean started = false;
@@ -194,7 +195,7 @@ public class DruidCoordinator
       BalancerStrategyFactory factory,
       LookupCoordinatorManager lookupCoordinatorManager,
       @Coordinator DruidLeaderSelector coordLeaderSelector,
-      CompactSegments compactSegments,
+      ObjectMapper objectMapper,
       ZkEnablementConfig zkEnablementConfig
   )
   {
@@ -219,7 +220,7 @@ public class DruidCoordinator
         factory,
         lookupCoordinatorManager,
         coordLeaderSelector,
-        compactSegments,
+        objectMapper,
         zkEnablementConfig
     );
   }
@@ -245,7 +246,7 @@ public class DruidCoordinator
       BalancerStrategyFactory factory,
       LookupCoordinatorManager lookupCoordinatorManager,
       DruidLeaderSelector coordLeaderSelector,
-      CompactSegments compactSegments,
+      ObjectMapper objectMapper,
       ZkEnablementConfig zkEnablementConfig
   )
   {
@@ -276,8 +277,8 @@ public class DruidCoordinator
     this.factory = factory;
     this.lookupCoordinatorManager = lookupCoordinatorManager;
     this.coordLeaderSelector = coordLeaderSelector;
-
-    this.compactSegments = compactSegments;
+    this.objectMapper = objectMapper;
+    this.compactSegments = initializeCompactSegmentsDuty();
   }
 
   public boolean isLeader()
@@ -769,14 +770,17 @@ public class DruidCoordinator
     );
   }
 
-  private List<CoordinatorDuty> makeIndexingServiceDuties()
+  @VisibleForTesting
+  List<CoordinatorDuty> makeIndexingServiceDuties()
   {
     List<CoordinatorDuty> duties = new ArrayList<>();
     duties.add(new LogUsedSegments());
     duties.addAll(indexingServiceDuties);
     // CompactSegmentsDuty should be the last duty as it can take a long time 
to complete
-    duties.addAll(makeCompactSegmentsDuty());
-
+    // We do not have to add compactSegments if it is already enabled in the 
custom duty group
+    if (getCompactSegmentsDutyFromCustomGroups().isEmpty()) {
+      duties.addAll(makeCompactSegmentsDuty());
+    }
     log.debug(
         "Done making indexing service duties %s",
         duties.stream().map(duty -> 
duty.getClass().getName()).collect(Collectors.toList())
@@ -797,6 +801,31 @@ public class DruidCoordinator
     return ImmutableList.copyOf(duties);
   }
 
+  @VisibleForTesting
+  CompactSegments initializeCompactSegmentsDuty()
+  {
+    List<CompactSegments> compactSegmentsDutyFromCustomGroups = 
getCompactSegmentsDutyFromCustomGroups();
+    if (compactSegmentsDutyFromCustomGroups.isEmpty()) {
+      return new CompactSegments(config, objectMapper, indexingServiceClient);
+    } else {
+      if (compactSegmentsDutyFromCustomGroups.size() > 1) {
+        log.warn("More than one compactSegments duty is configured in the 
Coordinator Custom Duty Group. The first duty will be picked up.");
+      }
+      return compactSegmentsDutyFromCustomGroups.get(0);
+    }
+  }
+
+  @VisibleForTesting
+  List<CompactSegments> getCompactSegmentsDutyFromCustomGroups()
+  {
+    return customDutyGroups.getCoordinatorCustomDutyGroups()
+                           .stream()
+                           .flatMap(coordinatorCustomDutyGroup -> 
coordinatorCustomDutyGroup.getCustomDutyList().stream())
+                           .filter(duty -> duty instanceof CompactSegments)
+                           .map(duty -> (CompactSegments) duty)
+                           .collect(Collectors.toList());
+  }
+
   private List<CoordinatorDuty> makeCompactSegmentsDuty()
   {
     return ImmutableList.of(compactSegments);
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 6598a6e..e3f47dd 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.server.coordinator.duty;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Inject;
@@ -56,7 +58,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-public class CompactSegments implements CoordinatorDuty
+public class CompactSegments implements CoordinatorCustomDuty
 {
   static final String COMPACTION_TASK_COUNT = "compactTaskCount";
   static final String AVAILABLE_COMPACTION_TASK_SLOT = 
"availableCompactionTaskSlot";
@@ -90,10 +92,11 @@ public class CompactSegments implements CoordinatorDuty
   private final AtomicReference<Map<String, AutoCompactionSnapshot>> 
autoCompactionSnapshotPerDataSource = new AtomicReference<>();
 
   @Inject
+  @JsonCreator
   public CompactSegments(
-      DruidCoordinatorConfig config,
-      ObjectMapper objectMapper,
-      IndexingServiceClient indexingServiceClient
+      @JacksonInject DruidCoordinatorConfig config,
+      @JacksonInject ObjectMapper objectMapper,
+      @JacksonInject IndexingServiceClient indexingServiceClient
   )
   {
     this.policy = new NewestSegmentFirstPolicy(objectMapper);
@@ -104,6 +107,18 @@ public class CompactSegments implements CoordinatorDuty
     LOG.info("Scheduling compaction with skipLockedIntervals [%s]", 
skipLockedIntervals);
   }
 
+  @VisibleForTesting
+  public boolean isSkipLockedIntervals()
+  {
+    return skipLockedIntervals;
+  }
+
+  @VisibleForTesting
+  IndexingServiceClient getIndexingServiceClient()
+  {
+    return indexingServiceClient;
+  }
+
   @Override
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java
index 86e2032..d481714 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java
@@ -51,6 +51,7 @@ import org.apache.druid.initialization.DruidModule;
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 @JsonSubTypes({
     @JsonSubTypes.Type(name = "killSupervisors", value = 
KillSupervisorsCustomDuty.class),
+    @JsonSubTypes.Type(name = "compactSegments", value = 
CompactSegments.class),
 })
 @ExtensionPoint
 public interface CoordinatorCustomDuty extends CoordinatorDuty
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index ec0201b..0363748 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -52,9 +52,12 @@ import org.apache.druid.metadata.SegmentsMetadataManager;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.duty.CompactSegments;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
+import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
+import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty;
 import 
org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
 import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
 import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
@@ -74,6 +77,7 @@ import org.junit.Test;
 import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -735,7 +739,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
     EasyMock.replay(configManager, dynamicConfig, scheduledExecutorFactory);
 
     DruidCoordinator c = new DruidCoordinator(
-        null,
+        druidCoordinatorConfig,
         null,
         configManager,
         null,
@@ -786,6 +790,226 @@ public class DruidCoordinatorTest extends CuratorTestBase
     Assert.assertFalse(firstExec == thirdExec);
   }
 
+  @Test
+  public void testCompactSegmentsDutyWhenCustomDutyGroupEmpty()
+  {
+    CoordinatorCustomDutyGroups emptyCustomDutyGroups = new 
CoordinatorCustomDutyGroups(ImmutableSet.of());
+    coordinator = new DruidCoordinator(
+        druidCoordinatorConfig,
+        new ZkPathsConfig()
+        {
+
+          @Override
+          public String getBase()
+          {
+            return "druid";
+          }
+        },
+        null,
+        segmentsMetadataManager,
+        serverInventoryView,
+        metadataRuleManager,
+        () -> curator,
+        serviceEmitter,
+        scheduledExecutorFactory,
+        null,
+        null,
+        new NoopServiceAnnouncer()
+        {
+          @Override
+          public void announce(DruidNode node)
+          {
+            // count down when this coordinator becomes the leader
+            leaderAnnouncerLatch.countDown();
+          }
+
+          @Override
+          public void unannounce(DruidNode node)
+          {
+            leaderUnannouncerLatch.countDown();
+          }
+        },
+        druidNode,
+        loadManagementPeons,
+        ImmutableSet.of(),
+        new HashSet<>(),
+        emptyCustomDutyGroups,
+        new CostBalancerStrategyFactory(),
+        EasyMock.createNiceMock(LookupCoordinatorManager.class),
+        new TestDruidLeaderSelector(),
+        null,
+        ZkEnablementConfig.ENABLED
+    );
+    // Since CompactSegments is not enabled in Custom Duty Group, then 
CompactSegments must be created in IndexingServiceDuties
+    List<CoordinatorDuty> indexingDuties = 
coordinator.makeIndexingServiceDuties();
+    Assert.assertTrue(indexingDuties.stream().anyMatch(coordinatorDuty -> 
coordinatorDuty instanceof CompactSegments));
+
+    // CompactSegments should not exist in Custom Duty Group
+    List<CompactSegments> compactSegmentsDutyFromCustomGroups = 
coordinator.getCompactSegmentsDutyFromCustomGroups();
+    Assert.assertTrue(compactSegmentsDutyFromCustomGroups.isEmpty());
+
+    // CompactSegments returned by this method should be created using the 
DruidCoordinatorConfig in the DruidCoordinator
+    CompactSegments duty = coordinator.initializeCompactSegmentsDuty();
+    Assert.assertNotNull(duty);
+    
Assert.assertEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), 
duty.isSkipLockedIntervals());
+  }
+
+  @Test
+  public void 
testInitializeCompactSegmentsDutyWhenCustomDutyGroupDoesNotContainsCompactSegments()
+  {
+    CoordinatorCustomDutyGroup group = new 
CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1), 
ImmutableList.of(new KillSupervisorsCustomDuty(new Duration("PT1S"), null)));
+    CoordinatorCustomDutyGroups customDutyGroups = new 
CoordinatorCustomDutyGroups(ImmutableSet.of(group));
+    coordinator = new DruidCoordinator(
+        druidCoordinatorConfig,
+        new ZkPathsConfig()
+        {
+
+          @Override
+          public String getBase()
+          {
+            return "druid";
+          }
+        },
+        null,
+        segmentsMetadataManager,
+        serverInventoryView,
+        metadataRuleManager,
+        () -> curator,
+        serviceEmitter,
+        scheduledExecutorFactory,
+        null,
+        null,
+        new NoopServiceAnnouncer()
+        {
+          @Override
+          public void announce(DruidNode node)
+          {
+            // count down when this coordinator becomes the leader
+            leaderAnnouncerLatch.countDown();
+          }
+
+          @Override
+          public void unannounce(DruidNode node)
+          {
+            leaderUnannouncerLatch.countDown();
+          }
+        },
+        druidNode,
+        loadManagementPeons,
+        ImmutableSet.of(),
+        new HashSet<>(),
+        customDutyGroups,
+        new CostBalancerStrategyFactory(),
+        EasyMock.createNiceMock(LookupCoordinatorManager.class),
+        new TestDruidLeaderSelector(),
+        null,
+        ZkEnablementConfig.ENABLED
+    );
+    // Since CompactSegments is not enabled in Custom Duty Group, then 
CompactSegments must be created in IndexingServiceDuties
+    List<CoordinatorDuty> indexingDuties = 
coordinator.makeIndexingServiceDuties();
+    Assert.assertTrue(indexingDuties.stream().anyMatch(coordinatorDuty -> 
coordinatorDuty instanceof CompactSegments));
+
+    // CompactSegments should not exist in Custom Duty Group
+    List<CompactSegments> compactSegmentsDutyFromCustomGroups = 
coordinator.getCompactSegmentsDutyFromCustomGroups();
+    Assert.assertTrue(compactSegmentsDutyFromCustomGroups.isEmpty());
+
+    // CompactSegments returned by this method should be created using the 
DruidCoordinatorConfig in the DruidCoordinator
+    CompactSegments duty = coordinator.initializeCompactSegmentsDuty();
+    Assert.assertNotNull(duty);
+    
Assert.assertEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), 
duty.isSkipLockedIntervals());
+  }
+
+  @Test
+  public void 
testInitializeCompactSegmentsDutyWhenCustomDutyGroupContainsCompactSegments()
+  {
+    DruidCoordinatorConfig differentConfigUsedInCustomGroup = new 
TestDruidCoordinatorConfig(
+        new Duration(COORDINATOR_START_DELAY),
+        new Duration(COORDINATOR_PERIOD),
+        null,
+        null,
+        null,
+        new Duration(COORDINATOR_PERIOD),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        10,
+        new Duration("PT0s"),
+        false
+    );
+    CoordinatorCustomDutyGroup compactSegmentCustomGroup = new 
CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1), 
ImmutableList.of(new CompactSegments(differentConfigUsedInCustomGroup, null, 
null)));
+    CoordinatorCustomDutyGroups customDutyGroups = new 
CoordinatorCustomDutyGroups(ImmutableSet.of(compactSegmentCustomGroup));
+    coordinator = new DruidCoordinator(
+        druidCoordinatorConfig,
+        new ZkPathsConfig()
+        {
+
+          @Override
+          public String getBase()
+          {
+            return "druid";
+          }
+        },
+        null,
+        segmentsMetadataManager,
+        serverInventoryView,
+        metadataRuleManager,
+        () -> curator,
+        serviceEmitter,
+        scheduledExecutorFactory,
+        null,
+        null,
+        new NoopServiceAnnouncer()
+        {
+          @Override
+          public void announce(DruidNode node)
+          {
+            // count down when this coordinator becomes the leader
+            leaderAnnouncerLatch.countDown();
+          }
+
+          @Override
+          public void unannounce(DruidNode node)
+          {
+            leaderUnannouncerLatch.countDown();
+          }
+        },
+        druidNode,
+        loadManagementPeons,
+        ImmutableSet.of(),
+        new HashSet<>(),
+        customDutyGroups,
+        new CostBalancerStrategyFactory(),
+        EasyMock.createNiceMock(LookupCoordinatorManager.class),
+        new TestDruidLeaderSelector(),
+        null,
+        ZkEnablementConfig.ENABLED
+    );
+    // Since CompactSegments is enabled in Custom Duty Group, then 
CompactSegments must not be created in IndexingServiceDuties
+    List<CoordinatorDuty> indexingDuties = 
coordinator.makeIndexingServiceDuties();
+    Assert.assertTrue(indexingDuties.stream().noneMatch(coordinatorDuty -> 
coordinatorDuty instanceof CompactSegments));
+
+    // CompactSegments should exist in Custom Duty Group
+    List<CompactSegments> compactSegmentsDutyFromCustomGroups = 
coordinator.getCompactSegmentsDutyFromCustomGroups();
+    Assert.assertFalse(compactSegmentsDutyFromCustomGroups.isEmpty());
+    Assert.assertEquals(1, compactSegmentsDutyFromCustomGroups.size());
+    Assert.assertNotNull(compactSegmentsDutyFromCustomGroups.get(0));
+    Assert.assertTrue(compactSegmentsDutyFromCustomGroups.get(0) instanceof 
CompactSegments);
+
+    // CompactSegments returned by this method should be from the Custom Duty 
Group
+    CompactSegments duty = coordinator.initializeCompactSegmentsDuty();
+    Assert.assertNotNull(duty);
+    
Assert.assertNotEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(),
 duty.isSkipLockedIntervals());
+    // We should get the CompactSegment from the custom duty group which was 
created with a different config than the config in DruidCoordinator
+    
Assert.assertEquals(differentConfigUsedInCustomGroup.getCompactionSkipLockedIntervals(),
 duty.isSkipLockedIntervals());
+  }
+
   @Test(timeout = 3000)
   public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception
   {
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
index 196e205..abb500e 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
@@ -23,7 +23,6 @@ import org.joda.time.Duration;
 
 public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
 {
-
   private final Duration coordinatorStartDelay;
   private final Duration coordinatorPeriod;
   private final Duration coordinatorIndexingPeriod;
@@ -42,6 +41,7 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
   private final Duration coordinatorDatasourceKillDurationToRetain;
   private final Duration getLoadQueuePeonRepeatDelay;
   private final int coordinatorKillMaxSegments;
+  private final boolean compactionSkipLockedIntervals;
 
   public TestDruidCoordinatorConfig(
       Duration coordinatorStartDelay,
@@ -82,6 +82,50 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
     this.coordinatorDatasourceKillDurationToRetain = 
coordinatorDatasourceKillDurationToRetain;
     this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
     this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay;
+    this.compactionSkipLockedIntervals = true;
+  }
+
+  public TestDruidCoordinatorConfig(
+      Duration coordinatorStartDelay,
+      Duration coordinatorPeriod,
+      Duration coordinatorIndexingPeriod,
+      Duration metadataStoreManagementPeriod,
+      Duration loadTimeoutDelay,
+      Duration coordinatorKillPeriod,
+      Duration coordinatorKillDurationToRetain,
+      Duration coordinatorSupervisorKillPeriod,
+      Duration coordinatorSupervisorKillDurationToRetain,
+      Duration coordinatorAuditKillPeriod,
+      Duration coordinatorAuditKillDurationToRetain,
+      Duration coordinatorCompactionKillPeriod,
+      Duration coordinatorRuleKillPeriod,
+      Duration coordinatorRuleKillDurationToRetain,
+      Duration coordinatorDatasourceKillPeriod,
+      Duration coordinatorDatasourceKillDurationToRetain,
+      int coordinatorKillMaxSegments,
+      Duration getLoadQueuePeonRepeatDelay,
+      boolean compactionSkipLockedIntervals
+  )
+  {
+    this.coordinatorStartDelay = coordinatorStartDelay;
+    this.coordinatorPeriod = coordinatorPeriod;
+    this.coordinatorIndexingPeriod = coordinatorIndexingPeriod;
+    this.metadataStoreManagementPeriod = metadataStoreManagementPeriod;
+    this.loadTimeoutDelay = loadTimeoutDelay;
+    this.coordinatorKillPeriod = coordinatorKillPeriod;
+    this.coordinatorKillDurationToRetain = coordinatorKillDurationToRetain;
+    this.coordinatorSupervisorKillPeriod = coordinatorSupervisorKillPeriod;
+    this.coordinatorSupervisorKillDurationToRetain = 
coordinatorSupervisorKillDurationToRetain;
+    this.coordinatorAuditKillPeriod = coordinatorAuditKillPeriod;
+    this.coordinatorAuditKillDurationToRetain = 
coordinatorAuditKillDurationToRetain;
+    this.coordinatorCompactionKillPeriod = coordinatorCompactionKillPeriod;
+    this.coordinatorRuleKillPeriod = coordinatorRuleKillPeriod;
+    this.coordinatorRuleKillDurationToRetain = 
coordinatorRuleKillDurationToRetain;
+    this.coordinatorDatasourceKillPeriod = coordinatorDatasourceKillPeriod;
+    this.coordinatorDatasourceKillDurationToRetain = 
coordinatorDatasourceKillDurationToRetain;
+    this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
+    this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay;
+    this.compactionSkipLockedIntervals = compactionSkipLockedIntervals;
   }
 
   @Override
@@ -191,4 +235,10 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
   {
     return getLoadQueuePeonRepeatDelay;
   }
+
+  @Override
+  public boolean getCompactionSkipLockedIntervals()
+  {
+    return compactionSkipLockedIntervals;
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 2892d44..bd4e601 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator.duty;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -38,6 +39,7 @@ import 
org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
 import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
 import org.apache.druid.client.indexing.ClientTaskQuery;
 import org.apache.druid.client.indexing.HttpIndexingServiceClient;
+import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
 import org.apache.druid.client.indexing.IndexingWorker;
 import org.apache.druid.client.indexing.IndexingWorkerInfo;
@@ -238,6 +240,28 @@ public class CompactSegmentsTest
   }
 
   @Test
+  public void testSerde() throws Exception
+  {
+    final TestDruidLeaderClient leaderClient = new 
TestDruidLeaderClient(JSON_MAPPER);
+    final HttpIndexingServiceClient indexingServiceClient = new 
HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+
+    JSON_MAPPER.setInjectableValues(
+        new InjectableValues.Std()
+            .addValue(DruidCoordinatorConfig.class, COORDINATOR_CONFIG)
+            .addValue(ObjectMapper.class, JSON_MAPPER)
+            .addValue(IndexingServiceClient.class, indexingServiceClient)
+    );
+
+    final CompactSegments compactSegments = new 
CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    String compactSegmentString = 
JSON_MAPPER.writeValueAsString(compactSegments);
+    CompactSegments serdeCompactSegments = 
JSON_MAPPER.readValue(compactSegmentString, CompactSegments.class);
+
+    Assert.assertNotNull(serdeCompactSegments);
+    Assert.assertEquals(COORDINATOR_CONFIG.getCompactionSkipLockedIntervals(), 
serdeCompactSegments.isSkipLockedIntervals());
+    Assert.assertEquals(indexingServiceClient, 
serdeCompactSegments.getIndexingServiceClient());
+  }
+
+  @Test
   public void testRun()
   {
     final TestDruidLeaderClient leaderClient = new 
TestDruidLeaderClient(JSON_MAPPER);

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to