This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 133054bf27 Make batched segment sampling the default, minor cleanup of 
coordinator config (#13391)
133054bf27 is described below

commit 133054bf27dd680fb281e440a7a9123d6e7818c5
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Nov 21 20:31:46 2022 +0530

    Make batched segment sampling the default, minor cleanup of coordinator 
config (#13391)
    
    The batch segment sampling performs significantly better than the older 
method
    of sampling if there are a large number of used segments. It also avoids 
duplicates.
    
    Changes:
    - Make batch segment sampling the default
    - Deprecate the property `useBatchedSegmentSampler`
    - Remove unused coordinator config 
`druid.coordinator.loadqueuepeon.repeatDelay`
    - Cleanup `KillUnusedSegments`
    - Simplify `KillUnusedSegmentsTest`, add better tests, remove redundant 
tests
---
 .../druid/server/coordinator/BalancerStrategy.java |   8 +-
 .../coordinator/CoordinatorDynamicConfig.java      |  16 +-
 .../server/coordinator/DruidCoordinatorConfig.java |   6 -
 .../coordinator/duty/KillUnusedSegments.java       | 103 +++--
 .../server/coordinator/BalanceSegmentsTest.java    |  15 +-
 .../coordinator/CuratorDruidCoordinatorTest.java   |   1 -
 .../coordinator/DruidCoordinatorConfigTest.java    |   2 -
 .../server/coordinator/DruidCoordinatorTest.java   |   2 -
 .../server/coordinator/HttpLoadQueuePeonTest.java  |   1 -
 .../server/coordinator/LoadQueuePeonTest.java      |   3 -
 .../server/coordinator/LoadQueuePeonTester.java    |   1 -
 .../coordinator/ReservoirSegmentSamplerTest.java   | 347 +++++++++--------
 .../coordinator/TestDruidCoordinatorConfig.java    |  18 -
 .../coordinator/duty/KillUnusedSegmentsTest.java   | 414 +++++++++------------
 .../simulate/CoordinatorSimulationBuilder.java     |   1 -
 .../server/http/CoordinatorDynamicConfigTest.java  |  24 +-
 .../coordinator-dynamic-config.tsx                 |   2 +-
 17 files changed, 439 insertions(+), 525 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
index 6478614ba5..4a48137a88 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
@@ -105,9 +105,7 @@ public interface BalancerStrategy
   }
 
   /**
-   * Pick the best segments to move from one of the supplied set of servers 
according to the balancing strategy. This
-   * is the deprecated way of picking a segment to move. 
pickSegmentsToMove(List<ServerHoler>, Set<String>, int) uses
-   * a more performant bathced sampling method that will become the default 
picking mode in the future.
+   * Pick the best segments to move from one of the supplied set of servers 
according to the balancing strategy.
    *
    * @param serverHolders set of historicals to consider for moving segments
    * @param broadcastDatasources Datasources that contain segments which were 
loaded via broadcast rules.
@@ -122,6 +120,10 @@ public interface BalancerStrategy
    *                                    for implementations of this method.
    * @return Iterator for set of {@link BalancerSegmentHolder} containing 
segment to move and server they currently
    * reside on, or empty if there are no segments to pick from (i. e. all 
provided serverHolders are empty).
+   *
+   * @deprecated Use {@link #pickSegmentsToMove(List, Set, int)} instead as it 
is
+   * a much more performant sampling method which does not allow duplicates. 
This
+   * method will be removed in future releases.
    */
   @Deprecated
   default Iterator<BalancerSegmentHolder> pickSegmentsToMove(
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index 475a8f88cd..b9f0d490a3 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -56,6 +56,7 @@ public class CoordinatorDynamicConfig
   private final int maxSegmentsToMove;
   @Deprecated
   private final double percentOfSegmentsToConsiderPerMove;
+  @Deprecated
   private final boolean useBatchedSegmentSampler;
   private final int replicantLifetime;
   private final int replicationThrottleLimit;
@@ -115,7 +116,7 @@ public class CoordinatorDynamicConfig
       @JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit,
       @JsonProperty("maxSegmentsToMove") int maxSegmentsToMove,
       @Deprecated @JsonProperty("percentOfSegmentsToConsiderPerMove") 
@Nullable Double percentOfSegmentsToConsiderPerMove,
-      @JsonProperty("useBatchedSegmentSampler") boolean 
useBatchedSegmentSampler,
+      @Deprecated @JsonProperty("useBatchedSegmentSampler") Boolean 
useBatchedSegmentSampler,
       @JsonProperty("replicantLifetime") int replicantLifetime,
       @JsonProperty("replicationThrottleLimit") int replicationThrottleLimit,
       @JsonProperty("balancerComputeThreads") int balancerComputeThreads,
@@ -161,7 +162,12 @@ public class CoordinatorDynamicConfig
     );
     this.percentOfSegmentsToConsiderPerMove = 
percentOfSegmentsToConsiderPerMove;
 
-    this.useBatchedSegmentSampler = useBatchedSegmentSampler;
+    if (useBatchedSegmentSampler == null) {
+      this.useBatchedSegmentSampler = 
Builder.DEFAULT_USE_BATCHED_SEGMENT_SAMPLER;
+    } else {
+      this.useBatchedSegmentSampler = useBatchedSegmentSampler;
+    }
+
     this.replicantLifetime = replicantLifetime;
     this.replicationThrottleLimit = replicationThrottleLimit;
     this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);
@@ -276,6 +282,7 @@ public class CoordinatorDynamicConfig
     return percentOfSegmentsToConsiderPerMove;
   }
 
+  @Deprecated
   @JsonProperty
   public boolean useBatchedSegmentSampler()
   {
@@ -517,7 +524,7 @@ public class CoordinatorDynamicConfig
     private static final int DEFAULT_REPLICATION_THROTTLE_LIMIT = 10;
     private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1;
     private static final boolean DEFAULT_EMIT_BALANCING_STATS = false;
-    private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = false;
+    private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = true;
     private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 100;
     private static final int 
DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
     private static final boolean DEFAULT_PAUSE_COORDINATION = false;
@@ -557,7 +564,7 @@ public class CoordinatorDynamicConfig
         @JsonProperty("mergeSegmentsLimit") @Nullable Integer 
mergeSegmentsLimit,
         @JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove,
         @Deprecated @JsonProperty("percentOfSegmentsToConsiderPerMove") 
@Nullable Double percentOfSegmentsToConsiderPerMove,
-        @JsonProperty("useBatchedSegmentSampler") Boolean 
useBatchedSegmentSampler,
+        @Deprecated @JsonProperty("useBatchedSegmentSampler") Boolean 
useBatchedSegmentSampler,
         @JsonProperty("replicantLifetime") @Nullable Integer replicantLifetime,
         @JsonProperty("replicationThrottleLimit") @Nullable Integer 
replicationThrottleLimit,
         @JsonProperty("balancerComputeThreads") @Nullable Integer 
balancerComputeThreads,
@@ -627,6 +634,7 @@ public class CoordinatorDynamicConfig
       return this;
     }
 
+    @Deprecated
     public Builder withUseBatchedSegmentSampler(boolean 
useBatchedSegmentSampler)
     {
       this.useBatchedSegmentSampler = useBatchedSegmentSampler;
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
index cdd708ea9d..9495f30f2c 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
@@ -101,12 +101,6 @@ public abstract class DruidCoordinatorConfig
     return new Duration(15 * 60 * 1000);
   }
 
-  @Config("druid.coordinator.loadqueuepeon.repeatDelay")
-  public Duration getLoadQueuePeonRepeatDelay()
-  {
-    return Duration.millis(50);
-  }
-
   @Config("druid.coordinator.loadqueuepeon.type")
   public String getLoadQueuePeonType()
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index 2ab3762ba9..f1659b3335 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import org.apache.druid.client.indexing.IndexingServiceClient;
@@ -33,7 +32,6 @@ import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
-import javax.annotation.Nullable;
 import java.util.Collection;
 import java.util.List;
 
@@ -43,7 +41,7 @@ import java.util.List;
  * negative meaning the interval end target will be in the future. Also, 
retainDuration can be ignored,
  * meaning that there is no upper bound to the end interval of segments that 
will be killed. This action is called
  * "to kill a segment".
- *
+ * <p>
  * See org.apache.druid.indexing.common.task.KillUnusedSegmentsTask.
  */
 public class KillUnusedSegments implements CoordinatorDuty
@@ -102,74 +100,69 @@ public class KillUnusedSegments implements CoordinatorDuty
     Collection<String> dataSourcesToKill =
         
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
 
+    // If no datasource has been specified, all are eligible for killing 
unused segments
     if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
       dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames();
     }
 
-    if (dataSourcesToKill != null &&
-        dataSourcesToKill.size() > 0 &&
-        (lastKillTime + period) < System.currentTimeMillis()) {
-      lastKillTime = System.currentTimeMillis();
-
-      for (String dataSource : dataSourcesToKill) {
-        final Interval intervalToKill = findIntervalForKill(dataSource, 
maxSegmentsToKill);
-        if (intervalToKill != null) {
-          try {
-            indexingServiceClient.killUnusedSegments("coordinator-issued", 
dataSource, intervalToKill);
-          }
-          catch (Exception ex) {
-            log.error(ex, "Failed to submit kill task for dataSource [%s]", 
dataSource);
-            if (Thread.currentThread().isInterrupted()) {
-              log.warn("skipping kill task scheduling because thread is 
interrupted.");
-              break;
-            }
-          }
-        }
-      }
+    final long currentTimeMillis = System.currentTimeMillis();
+    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+      log.debug("No eligible datasource to kill unused segments.");
+    } else if (lastKillTime + period > currentTimeMillis) {
+      log.debug("Skipping kill of unused segments as kill period has not 
elapsed yet.");
+    } else {
+      log.debug("Killing unused segments in datasources: %s", 
dataSourcesToKill);
+      lastKillTime = currentTimeMillis;
+      killUnusedSegments(dataSourcesToKill);
     }
+
     return params;
   }
 
-  /**
-   * For a given datasource and limit of segments that can be killed in one 
task, determine the interval to be
-   * submitted with the kill task.
-   *
-   * @param dataSource dataSource whose unused segments are being killed.
-   * @param limit the maximum number of segments that can be included in the 
kill task.
-   * @return {@link Interval} to be used in the kill task.
-   */
-  @VisibleForTesting
-  @Nullable
-  Interval findIntervalForKill(String dataSource, int limit)
+  private void killUnusedSegments(Collection<String> dataSourcesToKill)
   {
-    List<Interval> unusedSegmentIntervals =
-        segmentsMetadataManager.getUnusedSegmentIntervals(dataSource, 
getEndTimeUpperLimit(), limit);
+    int submittedTasks = 0;
+    for (String dataSource : dataSourcesToKill) {
+      final Interval intervalToKill = findIntervalForKill(dataSource);
+      if (intervalToKill == null) {
+        continue;
+      }
 
-    if (unusedSegmentIntervals != null && unusedSegmentIntervals.size() > 0) {
-      return JodaUtils.umbrellaInterval(unusedSegmentIntervals);
-    } else {
-      return null;
+      try {
+        indexingServiceClient.killUnusedSegments("coordinator-issued", 
dataSource, intervalToKill);
+        ++submittedTasks;
+      }
+      catch (Exception ex) {
+        log.error(ex, "Failed to submit kill task for dataSource [%s]", 
dataSource);
+        if (Thread.currentThread().isInterrupted()) {
+          log.warn("skipping kill task scheduling because thread is 
interrupted.");
+          break;
+        }
+      }
     }
+
+    log.debug("Submitted kill tasks for [%d] datasources.", submittedTasks);
   }
 
   /**
-   * Calculate the {@link DateTime} that wil form the upper bound when looking 
for segments that are
-   * eligible to be killed. If ignoreDurationToRetain is true, we have no 
upper bound and return a DateTime object
-   * for "max" time that works when comparing date strings.
-   *
-   * @return {@link DateTime} representing the upper bound time used when 
looking for segments to kill.
+   * Calculates the interval for which segments are to be killed in a 
datasource.
    */
-  @VisibleForTesting
-  DateTime getEndTimeUpperLimit()
+  private Interval findIntervalForKill(String dataSource)
   {
-    return ignoreRetainDuration
-           ? DateTimes.COMPARE_DATE_AS_STRING_MAX
-           : DateTimes.nowUtc().minus(retainDuration);
-  }
+    final DateTime maxEndTime = ignoreRetainDuration
+                                ? DateTimes.COMPARE_DATE_AS_STRING_MAX
+                                : DateTimes.nowUtc().minus(retainDuration);
 
-  @VisibleForTesting
-  Long getRetainDuration()
-  {
-    return retainDuration;
+    List<Interval> unusedSegmentIntervals = segmentsMetadataManager
+        .getUnusedSegmentIntervals(dataSource, maxEndTime, maxSegmentsToKill);
+
+    if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) {
+      return null;
+    } else if (unusedSegmentIntervals.size() == 1) {
+      return unusedSegmentIntervals.get(0);
+    } else {
+      return JodaUtils.umbrellaInterval(unusedSegmentIntervals);
+    }
   }
+
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
index a7c594fe09..e2bb7a816b 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
@@ -238,7 +238,7 @@ public class BalanceSegmentsTest
                 new ServerHolder(druidServer2, peon2, true)
             ),
             broadcastDatasources,
-            100.0
+            2
         )
     ).andReturn(
         ImmutableList.of(
@@ -247,7 +247,7 @@ public class BalanceSegmentsTest
         ).iterator()
     );
 
-    EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyDouble()))
+    EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyInt()))
             .andReturn(
                 ImmutableList.of(
                     new BalancerSegmentHolder(druidServer1, segment1),
@@ -297,7 +297,7 @@ public class BalanceSegmentsTest
     DruidCoordinatorRuntimeParams params = 
setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(10);
     params = new BalanceSegmentsTester(coordinator).run(params);
     Assert.assertEquals(1L, 
params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
-    Assert.assertEquals(ImmutableSet.of(segment2), peon3.getSegmentsToLoad());
+    Assert.assertEquals(ImmutableSet.of(segment1), peon3.getSegmentsToLoad());
   }
 
   /**
@@ -316,7 +316,7 @@ public class BalanceSegmentsTest
 
     BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
     EasyMock.expect(
-        strategy.pickSegmentsToMove(EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyDouble()))
+        strategy.pickSegmentsToMove(EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyInt()))
             .andReturn(
                 ImmutableList.of(
                     new BalancerSegmentHolder(druidServer1, segment2),
@@ -367,7 +367,7 @@ public class BalanceSegmentsTest
     mockCoordinator(coordinator);
 
     BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-    EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyDouble()))
+    EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyInt()))
             .andReturn(ImmutableList.of(new 
BalancerSegmentHolder(druidServer1, segment1)).iterator())
             .anyTimes();
     EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), 
EasyMock.anyObject())).andAnswer(() -> {
@@ -403,7 +403,7 @@ public class BalanceSegmentsTest
 
     ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false);
     BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-    EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyDouble()))
+    EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyInt()))
             .andReturn(ImmutableList.of(new 
BalancerSegmentHolder(druidServer1, segment1)).iterator())
             .once();
     EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), 
EasyMock.anyObject()))
@@ -592,6 +592,7 @@ public class BalanceSegmentsTest
         .withDynamicConfigs(
             CoordinatorDynamicConfig.builder()
                                     .withMaxSegmentsToMove(1)
+                                    .withUseBatchedSegmentSampler(false)
                                     .withPercentOfSegmentsToConsiderPerMove(40)
                                     .build()
         )
@@ -788,7 +789,7 @@ public class BalanceSegmentsTest
     ).andReturn(
         ImmutableList.of(new BalancerSegmentHolder(druidServer2, 
segment2)).iterator()
     );
-    EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyDouble()))
+    EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyInt()))
             .andReturn(ImmutableList.of(new 
BalancerSegmentHolder(druidServer1, segment1)).iterator());
     EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), 
EasyMock.anyObject()))
             .andReturn(new ServerHolder(druidServer3, peon3))
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index 926bccc53e..4538bc867b 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -164,7 +164,6 @@ public class CuratorDruidCoordinatorTest extends 
CuratorTestBase
         .withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD))
         .withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD))
         .withCoordinatorKillMaxSegments(10)
-        .withLoadQueuePeonRepeatDelay(new Duration("PT0s"))
         .withCoordinatorKillIgnoreDurationToRetain(false)
         .build();
     sourceLoadQueueChildrenCache = new PathChildrenCache(
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
index cfbe2b7de9..db264e55e0 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
@@ -46,7 +46,6 @@ public class DruidCoordinatorConfigTest
     Assert.assertEquals(7776000000L, 
config.getCoordinatorKillDurationToRetain().getMillis());
     Assert.assertEquals(100, config.getCoordinatorKillMaxSegments());
     Assert.assertEquals(new Duration(15 * 60 * 1000), 
config.getLoadTimeoutDelay());
-    Assert.assertEquals(Duration.millis(50), 
config.getLoadQueuePeonRepeatDelay());
     Assert.assertTrue(config.getCompactionSkipLockedIntervals());
     Assert.assertFalse(config.getCoordinatorKillIgnoreDurationToRetain());
     Assert.assertEquals("http", config.getLoadQueuePeonType());
@@ -76,7 +75,6 @@ public class DruidCoordinatorConfigTest
     Assert.assertEquals(new Duration("PT1s"), 
config.getCoordinatorKillDurationToRetain());
     Assert.assertEquals(10000, config.getCoordinatorKillMaxSegments());
     Assert.assertEquals(new Duration("PT1s"), config.getLoadTimeoutDelay());
-    Assert.assertEquals(Duration.millis(100), 
config.getLoadQueuePeonRepeatDelay());
     Assert.assertFalse(config.getCompactionSkipLockedIntervals());
     Assert.assertTrue(config.getCoordinatorKillIgnoreDurationToRetain());
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 980c336aa2..a0502d9544 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -148,7 +148,6 @@ public class DruidCoordinatorTest extends CuratorTestBase
         .withCoordinatorStartDelay(new Duration(COORDINATOR_START_DELAY))
         .withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD))
         .withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD))
-        .withLoadQueuePeonRepeatDelay(new Duration("PT0s"))
         .withCoordinatorKillIgnoreDurationToRetain(false)
         .build();
     pathChildrenCache = new PathChildrenCache(
@@ -833,7 +832,6 @@ public class DruidCoordinatorTest extends CuratorTestBase
         .withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD))
         .withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD))
         .withCoordinatorKillMaxSegments(10)
-        .withLoadQueuePeonRepeatDelay(new Duration("PT0s"))
         .withCompactionSkippedLockedIntervals(false)
         .withCoordinatorKillIgnoreDurationToRetain(false)
         .build();
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
index a1a1f06e05..aed032b196 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
@@ -75,7 +75,6 @@ public class HttpLoadQueuePeonTest
 
   final TestDruidCoordinatorConfig config = new 
TestDruidCoordinatorConfig.Builder()
       .withCoordinatorKillMaxSegments(10)
-      .withLoadQueuePeonRepeatDelay(Duration.ZERO)
       .withCoordinatorKillIgnoreDurationToRetain(false)
       .withHttpLoadQueuePeonBatchSize(2)
       .build();
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
index 6410f9e5ef..3929aa64d5 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
@@ -90,7 +90,6 @@ public class LoadQueuePeonTest extends CuratorTestBase
         Execs.singleThreaded("test_load_queue_peon-%d"),
         new TestDruidCoordinatorConfig.Builder()
             .withCoordinatorKillMaxSegments(10)
-            .withLoadQueuePeonRepeatDelay(Duration.millis(0))
             .withCoordinatorKillIgnoreDurationToRetain(false)
             .build()
     );
@@ -281,7 +280,6 @@ public class LoadQueuePeonTest extends CuratorTestBase
         new TestDruidCoordinatorConfig.Builder()
             .withLoadTimeoutDelay(new Duration(1))
             .withCoordinatorKillMaxSegments(10)
-            .withLoadQueuePeonRepeatDelay(new Duration("PT1s"))
             .withCoordinatorKillIgnoreDurationToRetain(false)
             .build()
     );
@@ -322,7 +320,6 @@ public class LoadQueuePeonTest extends CuratorTestBase
         new TestDruidCoordinatorConfig.Builder()
             .withLoadTimeoutDelay(new Duration(1))
             .withCoordinatorKillMaxSegments(10)
-            .withLoadQueuePeonRepeatDelay(new Duration("PT1s"))
             .withCoordinatorKillIgnoreDurationToRetain(false)
             .build()
     );
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
index 9abcb105c3..a7b92eaf8d 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
@@ -40,7 +40,6 @@ public class LoadQueuePeonTester extends CuratorLoadQueuePeon
         new TestDruidCoordinatorConfig.Builder()
             .withLoadTimeoutDelay(new Duration(1))
             .withCoordinatorKillMaxSegments(10)
-            .withLoadQueuePeonRepeatDelay(new Duration("PT1s"))
             .withCoordinatorKillIgnoreDurationToRetain(false)
             .build()
     );
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
index 70086abd77..4181ba2bbe 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
@@ -19,173 +19,65 @@
 
 package org.apache.druid.server.coordinator;
 
-import com.google.common.collect.Lists;
-import org.apache.druid.client.ImmutableDruidServer;
-import org.apache.druid.client.ImmutableDruidServerTests;
-import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.easymock.EasyMock;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class ReservoirSegmentSamplerTest
 {
-  private ImmutableDruidServer druidServer1;
-  private ImmutableDruidServer druidServer2;
-  private ImmutableDruidServer druidServer3;
-  private ImmutableDruidServer druidServer4;
-
-  private ServerHolder holder1;
-  private ServerHolder holder2;
-  private ServerHolder holder3;
-  private ServerHolder holder4;
-
-  private DataSegment segment1;
-  private DataSegment segment2;
-  private DataSegment segment3;
-  private DataSegment segment4;
-  List<DataSegment> segments1;
-  List<DataSegment> segments2;
-  List<DataSegment> segments3;
-  List<DataSegment> segments4;
-  List<DataSegment> segments;
+
+  /**
+   * num segments = 10 x 100 days
+   */
+  private final List<DataSegment> segments =
+      CreateDataSegments.ofDatasource("wiki")
+                        .forIntervals(100, Granularities.DAY)
+                        .startingAt("2022-01-01")
+                        .withNumPartitions(10)
+                        .eachOfSizeInMb(100);
 
   @Before
   public void setUp()
   {
-    druidServer1 = EasyMock.createMock(ImmutableDruidServer.class);
-    druidServer2 = EasyMock.createMock(ImmutableDruidServer.class);
-    druidServer3 = EasyMock.createMock(ImmutableDruidServer.class);
-    druidServer4 = EasyMock.createMock(ImmutableDruidServer.class);
-    holder1 = EasyMock.createMock(ServerHolder.class);
-    holder2 = EasyMock.createMock(ServerHolder.class);
-    holder3 = EasyMock.createMock(ServerHolder.class);
-    holder4 = EasyMock.createMock(ServerHolder.class);
-    segment1 = EasyMock.createMock(DataSegment.class);
-    segment2 = EasyMock.createMock(DataSegment.class);
-    segment3 = EasyMock.createMock(DataSegment.class);
-    segment4 = EasyMock.createMock(DataSegment.class);
-
-    DateTime start1 = DateTimes.of("2012-01-01");
-    DateTime start2 = DateTimes.of("2012-02-01");
-    DateTime version = DateTimes.of("2012-03-01");
-    segment1 = new DataSegment(
-        "datasource1",
-        new Interval(start1, start1.plusHours(1)),
-        version.toString(),
-        new HashMap<>(),
-        new ArrayList<>(),
-        new ArrayList<>(),
-        NoneShardSpec.instance(),
-        0,
-        11L
-    );
-    segment2 = new DataSegment(
-        "datasource1",
-        new Interval(start2, start2.plusHours(1)),
-        version.toString(),
-        new HashMap<>(),
-        new ArrayList<>(),
-        new ArrayList<>(),
-        NoneShardSpec.instance(),
-        0,
-        7L
-    );
-    segment3 = new DataSegment(
-        "datasource2",
-        new Interval(start1, start1.plusHours(1)),
-        version.toString(),
-        new HashMap<>(),
-        new ArrayList<>(),
-        new ArrayList<>(),
-        NoneShardSpec.instance(),
-        0,
-        4L
-    );
-    segment4 = new DataSegment(
-        "datasource2",
-        new Interval(start2, start2.plusHours(1)),
-        version.toString(),
-        new HashMap<>(),
-        new ArrayList<>(),
-        new ArrayList<>(),
-        NoneShardSpec.instance(),
-        0,
-        8L
-    );
-
-    segments = Lists.newArrayList(segment1, segment2, segment3, segment4);
-
-    segments1 = Collections.singletonList(segment1);
-    segments2 = Collections.singletonList(segment2);
-    segments3 = Collections.singletonList(segment3);
-    segments4 = Collections.singletonList(segment4);
   }
 
-  //checks if every segment is selected at least once out of 5000 trials
+  //checks if every segment is selected at least once out of 50 trials
   @Test
-  public void getRandomBalancerSegmentHolderTest()
+  public void testEverySegmentGetsPickedAtleastOnce()
   {
-    int iterations = 5000;
-
-    
EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
-    ImmutableDruidServerTests.expectSegments(druidServer1, segments1);
-    EasyMock.replay(druidServer1);
-
-    
EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
-    ImmutableDruidServerTests.expectSegments(druidServer2, segments2);
-    EasyMock.replay(druidServer2);
-
-    
EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
-    ImmutableDruidServerTests.expectSegments(druidServer3, segments3);
-    EasyMock.replay(druidServer3);
-
-    
EasyMock.expect(druidServer4.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
-    ImmutableDruidServerTests.expectSegments(druidServer4, segments4);
-    EasyMock.replay(druidServer4);
-
-    // Have to use anyTimes() because the number of times a segment on a given 
server is chosen is indetermistic.
-    EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes();
-    EasyMock.replay(holder1);
-    EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes();
-    EasyMock.replay(holder2);
-    EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes();
-    EasyMock.replay(holder3);
-    EasyMock.expect(holder4.getServer()).andReturn(druidServer4).anyTimes();
-    EasyMock.replay(holder4);
-
-    List<ServerHolder> holderList = new ArrayList<>();
-    holderList.add(holder1);
-    holderList.add(holder2);
-    holderList.add(holder3);
-    holderList.add(holder4);
+    int iterations = 50;
 
+    final List<ServerHolder> servers = Arrays.asList(
+        createHistorical("server1", segments.get(0)),
+        createHistorical("server2", segments.get(1)),
+        createHistorical("server3", segments.get(2)),
+        createHistorical("server4", segments.get(3))
+    );
     Map<DataSegment, Integer> segmentCountMap = new HashMap<>();
     for (int i = 0; i < iterations; i++) {
       // due to the pseudo-randomness of this method, we may not select a 
segment every single time no matter what.
-      segmentCountMap.put(
-          ReservoirSegmentSampler.getRandomBalancerSegmentHolders(holderList, 
Collections.emptySet(), 1).get(0).getSegment(),
-          1
+      segmentCountMap.compute(
+          ReservoirSegmentSampler
+              .getRandomBalancerSegmentHolders(servers, 
Collections.emptySet(), 1)
+              .get(0).getSegment(),
+          (segment, count) -> count == null ? 1 : count + 1
       );
     }
 
-    for (DataSegment segment : segments) {
-      Assert.assertEquals(new Integer(1), segmentCountMap.get(segment));
-    }
-
-    EasyMock.verify(druidServer1, druidServer2, druidServer3, druidServer4);
-    EasyMock.verify(holder1, holder2, holder3, holder4);
+    // Verify that each segment has been chosen at least once
+    Assert.assertEquals(4, segmentCountMap.size());
   }
 
   /**
@@ -195,56 +87,149 @@ public class ReservoirSegmentSamplerTest
   @Test
   public void getRandomBalancerSegmentHolderTestSegmentsToConsiderLimit()
   {
-    int iterations = 5000;
-
-    
EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
-    ImmutableDruidServerTests.expectSegments(druidServer1, segments1);
-    EasyMock.replay(druidServer1);
-
-    
EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
-    ImmutableDruidServerTests.expectSegments(druidServer2, segments2);
-    EasyMock.replay(druidServer2);
-
-    
EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
-    ImmutableDruidServerTests.expectSegments(druidServer3, segments3);
-    EasyMock.replay(druidServer3);
-
-    ImmutableDruidServerTests.expectSegments(druidServer4, segments4);
-    EasyMock.replay(druidServer4);
-
-    // Have to use anyTimes() because the number of times a segment on a given 
server is chosen is indetermistic.
-    EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes();
-    EasyMock.replay(holder1);
-    EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes();
-    EasyMock.replay(holder2);
-    EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes();
-    EasyMock.replay(holder3);
-    // We only run getServer() each time we calculate the limit on segments to 
consider. Always 5k
-    EasyMock.expect(holder4.getServer()).andReturn(druidServer4).times(5000);
-    EasyMock.replay(holder4);
-
-    List<ServerHolder> holderList = new ArrayList<>();
-    holderList.add(holder1);
-    holderList.add(holder2);
-    holderList.add(holder3);
-    holderList.add(holder4);
+    int iterations = 50;
+
+    final DataSegment excludedSegment = segments.get(3);
+    final List<ServerHolder> servers = Arrays.asList(
+        createHistorical("server1", segments.get(0)),
+        createHistorical("server2", segments.get(1)),
+        createHistorical("server3", segments.get(2)),
+        createHistorical("server4", excludedSegment)
+    );
 
     Map<DataSegment, Integer> segmentCountMap = new HashMap<>();
+
+    final double percentOfSegmentsToConsider = 75.0;
     for (int i = 0; i < iterations; i++) {
-      segmentCountMap.put(
-          ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, 
Collections.emptySet(), 75).getSegment(), 1
+      segmentCountMap.compute(
+          ReservoirSegmentSampler
+              .getRandomBalancerSegmentHolder(servers, Collections.emptySet(), 
percentOfSegmentsToConsider)
+              .getSegment(),
+          (segment, count) -> count == null ? 1 : count + 1
       );
     }
 
-    for (DataSegment segment : segments) {
-      if (!segment.equals(segment4)) {
-        Assert.assertEquals(new Integer(1), segmentCountMap.get(segment));
-      } else {
-        Assert.assertNull(segmentCountMap.get(segment));
+    // Verify that the segment on server4 is never chosen because of limit
+    Assert.assertFalse(segmentCountMap.containsKey(excludedSegment));
+    Assert.assertEquals(3, segmentCountMap.size());
+  }
+
+  @Test
+  public void testSegmentsOnBrokersAreIgnored()
+  {
+    final ServerHolder historical = createHistorical("hist1", segments.get(0), 
segments.get(1));
+
+    final ServerHolder broker = new ServerHolder(
+        new DruidServer("broker1", "broker1", null, 1000, ServerType.BROKER, 
null, 1)
+            .addDataSegment(segments.get(2))
+            .addDataSegment(segments.get(3))
+            .toImmutableDruidServer(),
+        new LoadQueuePeonTester()
+    );
+
+    // Try to pick all the segments on the servers
+    List<BalancerSegmentHolder> pickedSegments = 
ReservoirSegmentSampler.getRandomBalancerSegmentHolders(
+        Arrays.asList(historical, broker),
+        Collections.emptySet(),
+        10
+    );
+
+    // Verify that only the segments on the historical are picked
+    Assert.assertEquals(2, pickedSegments.size());
+    for (BalancerSegmentHolder holder : pickedSegments) {
+      Assert.assertEquals(historical.getServer(), holder.getFromServer());
+    }
+  }
+
+  @Test
+  public void testBroadcastSegmentsAreIgnored()
+  {
+    // num segments = 1 x 4 days
+    final String broadcastDatasource = "ds_broadcast";
+    final List<DataSegment> broadcastSegments
+        = CreateDataSegments.ofDatasource(broadcastDatasource)
+                            .forIntervals(4, Granularities.DAY)
+                            .startingAt("2022-01-01")
+                            .withNumPartitions(1)
+                            .eachOfSizeInMb(100);
+
+    final List<ServerHolder> servers = Arrays.asList(
+        createHistorical("server1", broadcastSegments.toArray(new 
DataSegment[0])),
+        createHistorical("server2", segments.get(0), segments.get(1))
+    );
+
+    // Try to pick all the segments on the servers
+    List<BalancerSegmentHolder> pickedSegments = ReservoirSegmentSampler
+        .getRandomBalancerSegmentHolders(servers, 
Collections.singleton(broadcastDatasource), 10);
+
+    // Verify that none of the broadcast segments are picked
+    Assert.assertEquals(2, pickedSegments.size());
+    for (BalancerSegmentHolder holder : pickedSegments) {
+      Assert.assertNotEquals(broadcastDatasource, 
holder.getSegment().getDataSource());
+    }
+  }
+
+  @Test(timeout = 60_000)
+  public void testNumberOfIterationsToCycleThroughAllSegments()
+  {
+    // The number of runs required for each sample percentage
+    // remains more or less fixed, even with a larger number of segments
+    final int[] samplePercentages = {100, 50, 10, 5, 1};
+    final int[] expectedIterations = {1, 20, 100, 200, 1000};
+
+    final int[] totalObservedIterations = new int[5];
+    for (int i = 0; i < 50; ++i) {
+      for (int j = 0; j < samplePercentages.length; ++j) {
+        totalObservedIterations[j] += 
countMinRunsWithSamplePercent(samplePercentages[j]);
+      }
+    }
+
+    for (int j = 0; j < samplePercentages.length; ++j) {
+      double avgObservedIterations = totalObservedIterations[j] / 50.0;
+      Assert.assertTrue(avgObservedIterations <= expectedIterations[j]);
+    }
+
+  }
+
+  /**
+   * Returns the minimum number of iterations of the reservoir sampling 
required
+   * to pick each segment atleast once.
+   * <p>
+   * {@code k = sampleSize = totalNumSegments * samplePercentage}
+   */
+  private int countMinRunsWithSamplePercent(int samplePercentage)
+  {
+    final int numSegments = segments.size();
+    final List<ServerHolder> servers = Arrays.asList(
+        createHistorical("server1", segments.subList(0, numSegments / 
2).toArray(new DataSegment[0])),
+        createHistorical("server2", segments.subList(numSegments / 2, 
numSegments).toArray(new DataSegment[0]))
+    );
+
+    final Set<DataSegment> pickedSegments = new HashSet<>();
+
+    int sampleSize = (int) (numSegments * samplePercentage / 100.0);
+
+    int numIterations = 1;
+    for (; numIterations < 10000; ++numIterations) {
+      ReservoirSegmentSampler
+          .getRandomBalancerSegmentHolders(servers, Collections.emptySet(), 
sampleSize)
+          .forEach(holder -> pickedSegments.add(holder.getSegment()));
+
+      if (pickedSegments.size() >= numSegments) {
+        break;
       }
     }
 
-    EasyMock.verify(druidServer1, druidServer2, druidServer3, druidServer4);
-    EasyMock.verify(holder1, holder2, holder3, holder4);
+    return numIterations;
+  }
+
+  private ServerHolder createHistorical(String serverName, DataSegment... 
loadedSegments)
+  {
+    final DruidServer server =
+        new DruidServer(serverName, serverName, null, 100000, 
ServerType.HISTORICAL, "normal", 1);
+    for (DataSegment segment : loadedSegments) {
+      server.addDataSegment(segment);
+    }
+    return new ServerHolder(server.toImmutableDruidServer(), new 
LoadQueuePeonTester());
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
index 8153dd7942..271bda1131 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
@@ -39,7 +39,6 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
   private final Duration coordinatorRuleKillDurationToRetain;
   private final Duration coordinatorDatasourceKillPeriod;
   private final Duration coordinatorDatasourceKillDurationToRetain;
-  private final Duration loadQueuePeonRepeatDelay;
   private final int coordinatorKillMaxSegments;
   private final boolean compactionSkipLockedIntervals;
   private final boolean coordinatorKillIgnoreDurationToRetain;
@@ -67,7 +66,6 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
       Duration coordinatorDatasourceKillPeriod,
       Duration coordinatorDatasourceKillDurationToRetain,
       int coordinatorKillMaxSegments,
-      Duration loadQueuePeonRepeatDelay,
       boolean compactionSkipLockedIntervals,
       boolean coordinatorKillIgnoreDurationToRetain,
       String loadQueuePeonType,
@@ -94,7 +92,6 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
     this.coordinatorDatasourceKillPeriod = coordinatorDatasourceKillPeriod;
     this.coordinatorDatasourceKillDurationToRetain = 
coordinatorDatasourceKillDurationToRetain;
     this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
-    this.loadQueuePeonRepeatDelay = loadQueuePeonRepeatDelay;
     this.compactionSkipLockedIntervals = compactionSkipLockedIntervals;
     this.coordinatorKillIgnoreDurationToRetain = 
coordinatorKillIgnoreDurationToRetain;
     this.loadQueuePeonType = loadQueuePeonType;
@@ -206,12 +203,6 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
     return loadTimeoutDelay == null ? super.getLoadTimeoutDelay() : 
loadTimeoutDelay;
   }
 
-  @Override
-  public Duration getLoadQueuePeonRepeatDelay()
-  {
-    return loadQueuePeonRepeatDelay;
-  }
-
   @Override
   public boolean getCompactionSkipLockedIntervals()
   {
@@ -272,7 +263,6 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
     private static final Duration DEFAULT_COORDINATOR_DATASOURCE_KILL_PERIOD = 
new Duration("PT86400s");
     private static final Duration 
DEFAULT_COORDINATOR_DATASOURCE_KILL_DURATION_TO_RETAIN = new 
Duration("PT7776000s");
     private static final Duration DEFAULT_LOAD_TIMEOUT_DELAY = new Duration(15 
* 60 * 1000);
-    private static final Duration DEFAULT_LOAD_QUEUE_PEON_REPEAT_DELAY = 
Duration.millis(50);
     private static final String DEFAULT_LOAD_QUEUE_PEON_TYPE = "curator";
     private static final int 
DEFAULT_CURATOR_LOAD_QUEUE_PEON_NUM_CALLBACK_THREADS = 2;
     private static final Duration DEFAULT_HTTP_LOAD_QUEUE_PEON_REPEAT_DELAY = 
Duration.millis(60000);
@@ -299,7 +289,6 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
     private Duration coordinatorDatasourceKillPeriod;
     private Duration coordinatorDatasourceKillDurationToRetain;
     private Duration loadTimeoutDelay;
-    private Duration loadQueuePeonRepeatDelay;
     private String loadQueuePeonType;
     private Duration httpLoadQueuePeonRepeatDelay;
     private Integer curatorLoadQueuePeonNumCallbackThreads;
@@ -409,12 +398,6 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
       return this;
     }
 
-    public Builder withLoadQueuePeonRepeatDelay(Duration 
loadQueuePeonRepeatDelay)
-    {
-      this.loadQueuePeonRepeatDelay = loadQueuePeonRepeatDelay;
-      return this;
-    }
-
     public Builder withLoadQueuePeonType(String loadQueuePeonType)
     {
       this.loadQueuePeonType = loadQueuePeonType;
@@ -483,7 +466,6 @@ public class TestDruidCoordinatorConfig extends 
DruidCoordinatorConfig
           coordinatorDatasourceKillPeriod == null ? 
DEFAULT_COORDINATOR_DATASOURCE_KILL_PERIOD : coordinatorDatasourceKillPeriod,
           coordinatorDatasourceKillDurationToRetain == null ? 
DEFAULT_COORDINATOR_DATASOURCE_KILL_DURATION_TO_RETAIN : 
coordinatorDatasourceKillDurationToRetain,
           coordinatorKillMaxSegments == null ? 
DEFAULT_COORDINATOR_KILL_MAX_SEGMENTS : coordinatorKillMaxSegments,
-          loadQueuePeonRepeatDelay == null ? 
DEFAULT_LOAD_QUEUE_PEON_REPEAT_DELAY : loadQueuePeonRepeatDelay,
           compactionSkippedLockedIntervals == null ? 
DEFAULT_COMPACTION_SKIP_LOCKED_INTERVALS : compactionSkippedLockedIntervals,
           coordinatorKillIgnoreDurationToRetain == null ? 
DEFAULT_COORDINATOR_KILL_IGNORE_DURATION_TO_RETAIN : 
coordinatorKillIgnoreDurationToRetain,
           loadQueuePeonType == null ? DEFAULT_LOAD_QUEUE_PEON_TYPE : 
loadQueuePeonType,
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index 48d730cd7c..ed7fea5aaf 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -20,281 +20,221 @@
 package org.apache.druid.server.coordinator.duty;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.metadata.SegmentsMetadataManager;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
-import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
-import org.easymock.EasyMock;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
-import org.junit.Assert;
+import org.joda.time.Period;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.mockito.Answers;
+import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 
 /**
+ *
  */
-@RunWith(Enclosed.class)
+@RunWith(MockitoJUnitRunner.class)
 public class KillUnusedSegmentsTest
 {
-  /**
-   * Standing up new tests with mocks was easier than trying to move the 
existing tests to use mocks for consistency.
-   * In the future, if all tests are moved to use the same structure, this 
inner static class can be gotten rid of.
-   */
-  @RunWith(MockitoJUnitRunner.class)
-  public static class MockedTest
+  private static final int MAX_SEGMENTS_TO_KILL = 10;
+  private static final Duration COORDINATOR_KILL_PERIOD = 
Duration.standardMinutes(2);
+  private static final Duration DURATION_TO_RETAIN = Duration.standardDays(1);
+  private static final Duration INDEXING_PERIOD = Duration.standardMinutes(1);
+
+  @Mock
+  private SegmentsMetadataManager segmentsMetadataManager;
+  @Mock
+  private IndexingServiceClient indexingServiceClient;
+  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+  private DruidCoordinatorConfig config;
+
+  @Mock
+  private DruidCoordinatorRuntimeParams params;
+  @Mock
+  private CoordinatorDynamicConfig coordinatorDynamicConfig;
+
+  private DataSegment yearOldSegment;
+  private DataSegment monthOldSegment;
+  private DataSegment dayOldSegment;
+  private DataSegment hourOldSegment;
+  private DataSegment nextDaySegment;
+  private DataSegment nextMonthSegment;
+
+  private KillUnusedSegments target;
+
+  @Before
+  public void setup()
   {
-    private static final Set<String> ALL_DATASOURCES = ImmutableSet.of("DS1", 
"DS2", "DS3");
-    private static final int MAX_SEGMENTS_TO_KILL = 10;
-    private static final Duration COORDINATOR_KILL_PERIOD = 
Duration.standardMinutes(2);
-    private static final Duration DURATION_TO_RETAIN = 
Duration.standardDays(1);
-    private static final Duration INDEXING_PERIOD = 
Duration.standardMinutes(1);
-
-    @Mock
-    private SegmentsMetadataManager segmentsMetadataManager;
-    @Mock
-    private IndexingServiceClient indexingServiceClient;
-    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
-    private DruidCoordinatorConfig config;
-
-    @Mock
-    private DruidCoordinatorRuntimeParams params;
-    @Mock
-    private CoordinatorDynamicConfig coordinatorDynamicConfig;
-    private KillUnusedSegments target;
-
-    @Before
-    public void setup()
-    {
-      
Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig();
-      
Mockito.doReturn(ALL_DATASOURCES).when(segmentsMetadataManager).retrieveAllDataSourceNames();
-      
Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod();
-      
Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain();
-      
Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod();
-      
Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments();
-      target = new KillUnusedSegments(segmentsMetadataManager, 
indexingServiceClient, config);
-    }
-    @Test
-    public void testRunWihNoIntervalShouldNotKillAnySegments()
-    {
-      target.run(params);
-      Mockito.verify(indexingServiceClient, Mockito.never())
-             .killUnusedSegments(anyString(), anyString(), 
any(Interval.class));
-    }
-
-    @Test
-    public void 
testRunWihSpecificDatasourceAndNoIntervalShouldNotKillAnySegments()
-    {
-      
Mockito.when(coordinatorDynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn()).thenReturn(Collections.singleton("DS1"));
-      target.run(params);
-      Mockito.verify(indexingServiceClient, Mockito.never())
-             .killUnusedSegments(anyString(), anyString(), 
any(Interval.class));
-    }
+    
Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig();
+    
Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod();
+    
Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain();
+    
Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod();
+    
Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments();
+
+    Mockito.doReturn(Collections.singleton("DS1"))
+           
.when(coordinatorDynamicConfig).getSpecificDataSourcesToKillUnusedSegmentsIn();
+
+    final DateTime now = DateTimes.nowUtc();
+
+    yearOldSegment = createSegmentWithEnd(now.minusDays(365));
+    monthOldSegment = createSegmentWithEnd(now.minusDays(30));
+    dayOldSegment = createSegmentWithEnd(now.minusDays(1));
+    hourOldSegment = createSegmentWithEnd(now.minusHours(1));
+    nextDaySegment = createSegmentWithEnd(now.plusDays(1));
+    nextMonthSegment = createSegmentWithEnd(now.plusDays(30));
+
+    final List<DataSegment> unusedSegments = ImmutableList.of(
+        yearOldSegment,
+        monthOldSegment,
+        dayOldSegment,
+        hourOldSegment,
+        nextDaySegment,
+        nextMonthSegment
+    );
+
+    Mockito.when(
+        segmentsMetadataManager.getUnusedSegmentIntervals(
+            ArgumentMatchers.anyString(),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.anyInt()
+        )
+    ).thenAnswer(invocation -> {
+      DateTime maxEndTime = invocation.getArgument(1);
+      List<Interval> unusedIntervals =
+          unusedSegments.stream()
+                        .map(DataSegment::getInterval)
+                        .filter(i -> i.getEnd().isBefore(maxEndTime))
+                        .collect(Collectors.toList());
+
+      int limit = invocation.getArgument(2);
+      return unusedIntervals.size() <= limit ? unusedIntervals : 
unusedIntervals.subList(0, limit);
+    });
+
+    target = new KillUnusedSegments(segmentsMetadataManager, 
indexingServiceClient, config);
   }
 
-  public static class FindIntervalsTest
+  @Test
+  public void testRunWithNoIntervalShouldNotKillAnySegments()
   {
-    @Test
-    public void testFindIntervalForKill()
-    {
-      testFindIntervalForKill(null, null);
-      testFindIntervalForKill(ImmutableList.of(), null);
-
-      testFindIntervalForKill(ImmutableList.of(Intervals.of("2014/2015")), 
Intervals.of("2014/2015"));
-
-      testFindIntervalForKill(
-          ImmutableList.of(Intervals.of("2014/2015"), 
Intervals.of("2016/2017")),
-          Intervals.of("2014/2017")
-      );
-
-      testFindIntervalForKill(
-          ImmutableList.of(Intervals.of("2014/2015"), 
Intervals.of("2015/2016")),
-          Intervals.of("2014/2016")
-      );
-
-      testFindIntervalForKill(
-          ImmutableList.of(Intervals.of("2015/2016"), 
Intervals.of("2014/2015")),
-          Intervals.of("2014/2016")
-      );
-
-      testFindIntervalForKill(
-          ImmutableList.of(Intervals.of("2015/2017"), 
Intervals.of("2014/2016")),
-          Intervals.of("2014/2017")
-      );
-
-      testFindIntervalForKill(
-          ImmutableList.of(
-              Intervals.of("2015/2019"),
-              Intervals.of("2014/2016"),
-              Intervals.of("2018/2020")
-          ),
-          Intervals.of("2014/2020")
-      );
-
-      testFindIntervalForKill(
-          ImmutableList.of(
-              Intervals.of("2015/2019"),
-              Intervals.of("2014/2016"),
-              Intervals.of("2018/2020"),
-              Intervals.of("2021/2022")
-          ),
-          Intervals.of("2014/2022")
-      );
-    }
-
-    private void testFindIntervalForKill(List<Interval> segmentIntervals, 
Interval expected)
-    {
-      SegmentsMetadataManager segmentsMetadataManager = 
EasyMock.createMock(SegmentsMetadataManager.class);
-      EasyMock.expect(
-          segmentsMetadataManager.getUnusedSegmentIntervals(
-              EasyMock.anyString(),
-              EasyMock.anyObject(DateTime.class),
-              EasyMock.anyInt()
-          )
-      ).andReturn(segmentIntervals);
-      EasyMock.replay(segmentsMetadataManager);
-      IndexingServiceClient indexingServiceClient = 
EasyMock.createMock(IndexingServiceClient.class);
+    
Mockito.doReturn(null).when(segmentsMetadataManager).getUnusedSegmentIntervals(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.anyInt()
+    );
+
+    target.run(params);
+    Mockito.verify(indexingServiceClient, Mockito.never())
+           .killUnusedSegments(anyString(), anyString(), any(Interval.class));
+  }
 
-      KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
-          segmentsMetadataManager,
-          indexingServiceClient,
-          new TestDruidCoordinatorConfig.Builder()
-              .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
-              .withLoadTimeoutDelay(new Duration(1))
-              .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
-              .withCoordinatorKillDurationToRetain(Duration.parse("PT86400S"))
-              .withCoordinatorKillMaxSegments(1000)
-              .withLoadQueuePeonRepeatDelay(Duration.ZERO)
-              .withCoordinatorKillIgnoreDurationToRetain(false)
-              .build()
-      );
+  @Test
+  public void 
testRunWithSpecificDatasourceAndNoIntervalShouldNotKillAnySegments()
+  {
+    Mockito.doReturn(Duration.standardDays(400))
+           .when(config).getCoordinatorKillDurationToRetain();
+    target = new KillUnusedSegments(segmentsMetadataManager, 
indexingServiceClient, config);
+
+    // No unused segment is older than the retention period
+    target.run(params);
+    Mockito.verify(indexingServiceClient, Mockito.never())
+           .killUnusedSegments(anyString(), anyString(), any(Interval.class));
+  }
 
-      Assert.assertEquals(
-          expected,
-          unusedSegmentsKiller.findIntervalForKill("test", 10000)
-      );
-    }
+  @Test
+  public void testDurationToRetain()
+  {
+    // Only segments more than a day old are killed
+    Interval expectedKillInterval = new Interval(
+        yearOldSegment.getInterval().getStart(),
+        dayOldSegment.getInterval().getEnd()
+    );
+    runAndVerifyKillInterval(expectedKillInterval);
+  }
 
-    /**
-     * Test that retainDuration is properly set based on the value available 
in the
-     * Coordinator config. Positive and Negative durations should work as well 
as
-     * null, if and only if ignoreDurationToRetain is true.
-     */
-    @Test
-    public void testRetainDurationValues()
-    {
-      // Positive duration to retain
-      KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
-          null,
-          null,
-          new TestDruidCoordinatorConfig.Builder()
-              .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
-              .withLoadTimeoutDelay(new Duration(1))
-              .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
-              .withCoordinatorKillDurationToRetain(Duration.parse("PT86400S"))
-              .withCoordinatorKillMaxSegments(1000)
-              .withLoadQueuePeonRepeatDelay(Duration.ZERO)
-              .withCoordinatorKillIgnoreDurationToRetain(false)
-              .build()
-      );
-      Assert.assertEquals((Long) Duration.parse("PT86400S").getMillis(), 
unusedSegmentsKiller.getRetainDuration());
+  @Test
+  public void testNegativeDurationToRetain()
+  {
+    // Duration to retain = -1 day, reinit target for config to take effect
+    Mockito.doReturn(DURATION_TO_RETAIN.negated())
+           .when(config).getCoordinatorKillDurationToRetain();
+    target = new KillUnusedSegments(segmentsMetadataManager, 
indexingServiceClient, config);
+
+    // Segments upto 1 day in the future are killed
+    Interval expectedKillInterval = new Interval(
+        yearOldSegment.getInterval().getStart(),
+        nextDaySegment.getInterval().getEnd()
+    );
+    runAndVerifyKillInterval(expectedKillInterval);
+  }
 
-      // Negative duration to retain
-      unusedSegmentsKiller = new KillUnusedSegments(
-          null,
-          null,
-          new TestDruidCoordinatorConfig.Builder()
-              .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
-              .withLoadTimeoutDelay(new Duration(1))
-              .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
-              .withCoordinatorKillDurationToRetain(Duration.parse("PT-86400S"))
-              .withCoordinatorKillMaxSegments(1000)
-              .withLoadQueuePeonRepeatDelay(Duration.ZERO)
-              .withCoordinatorKillIgnoreDurationToRetain(false)
-              .build()
-      );
-      Assert.assertEquals((Long) Duration.parse("PT-86400S").getMillis(), 
unusedSegmentsKiller.getRetainDuration());
-    }
+  @Test
+  public void testIgnoreDurationToRetain()
+  {
+    Mockito.doReturn(true)
+           .when(config).getCoordinatorKillIgnoreDurationToRetain();
+    target = new KillUnusedSegments(segmentsMetadataManager, 
indexingServiceClient, config);
+
+    // All future and past unused segments are killed
+    Interval expectedKillInterval = new Interval(
+        yearOldSegment.getInterval().getStart(),
+        nextMonthSegment.getInterval().getEnd()
+    );
+    runAndVerifyKillInterval(expectedKillInterval);
+  }
 
-    /**
-     * Test that the end time upper limit is properly computated for both 
positive and
-     * negative durations. Also ensure that if durationToRetain is to be 
ignored, that
-     * the upper limit is {@link DateTime} max time.
-     */
-    @Test
-    public void testGetEndTimeUpperLimit()
-    {
-      // If ignoreDurationToRetain is true, ignore the value configured for 
durationToRetain and return 9999-12-31T23:59
-      KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
-          null,
-          null,
-          new TestDruidCoordinatorConfig.Builder()
-              .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
-              .withLoadTimeoutDelay(new Duration(1))
-              .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
-              .withCoordinatorKillDurationToRetain(Duration.parse("PT86400S"))
-              .withCoordinatorKillMaxSegments(1000)
-              .withLoadQueuePeonRepeatDelay(Duration.ZERO)
-              .withCoordinatorKillIgnoreDurationToRetain(true)
-              .build()
-      );
-      Assert.assertEquals(
-          DateTimes.COMPARE_DATE_AS_STRING_MAX,
-          unusedSegmentsKiller.getEndTimeUpperLimit()
-      );
+  @Test
+  public void testMaxSegmentsToKill()
+  {
+    Mockito.doReturn(1)
+           .when(config).getCoordinatorKillMaxSegments();
+    target = new KillUnusedSegments(segmentsMetadataManager, 
indexingServiceClient, config);
 
-      // Testing a negative durationToRetain period returns proper date in 
future
-      unusedSegmentsKiller = new KillUnusedSegments(
-          null,
-          null,
-          new TestDruidCoordinatorConfig.Builder()
-              .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
-              .withLoadTimeoutDelay(new Duration(1))
-              .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
-              .withCoordinatorKillDurationToRetain(Duration.parse("PT-86400S"))
-              .withCoordinatorKillMaxSegments(1000)
-              .withLoadQueuePeonRepeatDelay(Duration.ZERO)
-              .withCoordinatorKillIgnoreDurationToRetain(false)
-              .build()
-      );
+    // Only 1 unused segment is killed
+    runAndVerifyKillInterval(yearOldSegment.getInterval());
+  }
 
-      DateTime expectedTime = 
DateTimes.nowUtc().minus(Duration.parse("PT-86400S").getMillis());
-      Assert.assertEquals(expectedTime, 
unusedSegmentsKiller.getEndTimeUpperLimit());
+  private void runAndVerifyKillInterval(Interval expectedKillInterval)
+  {
+    target.run(params);
+    Mockito.verify(indexingServiceClient, Mockito.times(1)).killUnusedSegments(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.eq("DS1"),
+        ArgumentMatchers.eq(expectedKillInterval)
+    );
+  }
 
-      // Testing a positive durationToRetain period returns expected value in 
the past
-      unusedSegmentsKiller = new KillUnusedSegments(
-          null,
-          null,
-          new TestDruidCoordinatorConfig.Builder()
-              .withCoordinatorIndexingPeriod(Duration.parse("PT76400S"))
-              .withLoadTimeoutDelay(new Duration(1))
-              .withCoordinatorKillPeriod(Duration.parse("PT86400S"))
-              .withCoordinatorKillDurationToRetain(Duration.parse("PT86400S"))
-              .withCoordinatorKillMaxSegments(1000)
-              .withLoadQueuePeonRepeatDelay(Duration.ZERO)
-              .withCoordinatorKillIgnoreDurationToRetain(false)
-              .build()
-      );
-      expectedTime = 
DateTimes.nowUtc().minus(Duration.parse("PT86400S").getMillis());
-      Assert.assertEquals(expectedTime, 
unusedSegmentsKiller.getEndTimeUpperLimit());
-    }
+  private DataSegment createSegmentWithEnd(DateTime endTime)
+  {
+    return new DataSegment(
+        "DS1",
+        new Interval(Period.days(1), endTime),
+        DateTimes.nowUtc().toString(),
+        new HashMap<>(),
+        new ArrayList<>(),
+        new ArrayList<>(),
+        NoneShardSpec.instance(),
+        1,
+        0
+    );
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index 28b84f62e9..01308d82e7 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -453,7 +453,6 @@ public class CoordinatorSimulationBuilder
           .withCoordinatorStartDelay(new Duration(1L))
           .withCoordinatorPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
           .withCoordinatorKillPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
-          .withLoadQueuePeonRepeatDelay(new Duration("PT0S"))
           .withLoadQueuePeonType("http")
           .withCoordinatorKillIgnoreDurationToRetain(false)
           .build();
diff --git 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
index 7248903619..707ea1d0f9 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
@@ -78,6 +78,7 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
+        true,
         1,
         1,
         2,
@@ -100,6 +101,7 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
+        true,
         1,
         1,
         2,
@@ -122,6 +124,7 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
+        true,
         1,
         1,
         2,
@@ -144,6 +147,7 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
+        true,
         1,
         1,
         2,
@@ -158,7 +162,10 @@ public class CoordinatorDynamicConfigTest
         Integer.MAX_VALUE
     );
 
-    actual = 
CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual);
+    actual = CoordinatorDynamicConfig.builder()
+                                     
.withPercentOfSegmentsToConsiderPerMove(10)
+                                     .withUseBatchedSegmentSampler(false)
+                                     .build(actual);
     assertConfig(
         actual,
         1,
@@ -166,6 +173,7 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         10,
+        false,
         1,
         1,
         2,
@@ -188,6 +196,7 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         10,
+        false,
         1,
         1,
         2,
@@ -210,6 +219,7 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         10,
+        false,
         1,
         1,
         2,
@@ -315,7 +325,7 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         100,
-        1,
+        true, 1,
         1,
         2,
         true,
@@ -337,6 +347,7 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         100,
+        true,
         1,
         1,
         2,
@@ -359,6 +370,7 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         100,
+        true,
         1,
         1,
         2,
@@ -407,6 +419,7 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
+        true,
         1,
         1,
         2,
@@ -525,6 +538,7 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         100,
+        true,
         1,
         1,
         2,
@@ -574,6 +588,7 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
+        true,
         1,
         1,
         2,
@@ -638,6 +653,7 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
+        true,
         1,
         1,
         2,
@@ -665,6 +681,7 @@ public class CoordinatorDynamicConfigTest
         100,
         5,
         100,
+        true,
         15,
         10,
         1,
@@ -695,6 +712,7 @@ public class CoordinatorDynamicConfigTest
         100,
         5,
         100,
+        true,
         15,
         10,
         1,
@@ -785,6 +803,7 @@ public class CoordinatorDynamicConfigTest
       int expectedMergeSegmentsLimit,
       int expectedMaxSegmentsToMove,
       int expectedPercentOfSegmentsToConsiderPerMove,
+      boolean expectedUseBatchedSegmentSampler,
       int expectedReplicantLifetime,
       int expectedReplicationThrottleLimit,
       int expectedBalancerComputeThreads,
@@ -807,6 +826,7 @@ public class CoordinatorDynamicConfigTest
     Assert.assertEquals(expectedMergeSegmentsLimit, 
config.getMergeSegmentsLimit());
     Assert.assertEquals(expectedMaxSegmentsToMove, 
config.getMaxSegmentsToMove());
     Assert.assertEquals(expectedPercentOfSegmentsToConsiderPerMove, 
config.getPercentOfSegmentsToConsiderPerMove(), 0);
+    Assert.assertEquals(expectedUseBatchedSegmentSampler, 
config.useBatchedSegmentSampler());
     Assert.assertEquals(expectedReplicantLifetime, 
config.getReplicantLifetime());
     Assert.assertEquals(expectedReplicationThrottleLimit, 
config.getReplicationThrottleLimit());
     Assert.assertEquals(expectedBalancerComputeThreads, 
config.getBalancerComputeThreads());
diff --git 
a/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
 
b/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
index 1d148e6b18..eeb25db09c 100644
--- 
a/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
+++ 
b/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
@@ -197,7 +197,7 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: 
Field<CoordinatorDynamicConfig>[
   {
     name: 'useBatchedSegmentSampler',
     type: 'boolean',
-    defaultValue: false,
+    defaultValue: true,
     info: (
       <>
         Boolean flag for whether or not we should use the Reservoir Sampling 
with a reservoir of


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to