This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a6547febaf Remove unused coordinator dynamic configs (#14524)
a6547febaf is described below

commit a6547febaf8dcfb35f11073972317d7129035c98
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Jul 6 12:11:10 2023 +0530

    Remove unused coordinator dynamic configs (#14524)
    
    After #13197 , several coordinator configs are now redundant as they are 
not being
    used anymore, neither with `smartSegmentLoading` nor otherwise.
    
    Changes:
    - Remove dynamic configs `emitBalancingStats`: balancer error stats are 
always
    emitted, debug stats can be logged by using `debugDimensions`
    - `useBatchedSegmentSampler`, `percentOfSegmentsToConsiderPerMove`:
    batched segment sampling is always used
    - Add test to verify deserialization with unknown properties
    - Update `CoordinatorRunStats` to always track stats, this can be optimized 
later.
---
 .../coordinator/BalancerStrategyBenchmark.java     |   7 +-
 docs/configuration/index.md                        |   8 +-
 docs/operations/metrics.md                         |   2 -
 .../coordinator/CoordinatorDynamicConfig.java      |  97 ------------
 .../balancer/ReservoirSegmentSampler.java          |   2 +
 .../coordinator/loading/SegmentLoadingConfig.java  |  18 +--
 .../coordinator/stats/CoordinatorRunStats.java     |   9 +-
 .../server/coordinator/stats/CoordinatorStat.java  |  39 +++--
 .../druid/server/coordinator/stats/Stats.java      |  89 +++++------
 .../coordinator/CoordinatorRunStatsTest.java       | 170 +++++++++++----------
 .../coordinator/duty/BalanceSegmentsTest.java      |   5 +-
 .../simulate/CoordinatorSimulationBaseTest.java    |   6 +-
 .../simulate/CoordinatorSimulationBuilder.java     |   6 +-
 .../server/http/CoordinatorDynamicConfigTest.java  | 129 +++-------------
 .../coordinator-dynamic-config.tsx                 |  47 ------
 15 files changed, 201 insertions(+), 433 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java
index 0bf5611998..2afd99c8c3 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java
@@ -64,9 +64,6 @@ public class BalancerStrategyBenchmark
   private static final Interval TEST_SEGMENT_INTERVAL = 
Intervals.of("2012-03-15T00:00:00.000/2012-03-16T00:00:00.000");
   private static final int NUMBER_OF_SERVERS = 20;
 
-  @Param({"default", "useBatchedSegmentSampler"})
-  private String mode;
-  
   @Param({"10000", "100000", "1000000"})
   private int numberOfSegments;
   
@@ -79,9 +76,7 @@ public class BalancerStrategyBenchmark
   @Setup(Level.Trial)
   public void setup()
   {
-    if ("useBatchedSegmentSampler".equals(mode)) {
-      reservoirSize = maxSegmentsToMove;
-    }
+    reservoirSize = maxSegmentsToMove;
     
     List<List<DataSegment>> segmentList = new ArrayList<>(NUMBER_OF_SERVERS);
     IntStream.range(0, NUMBER_OF_SERVERS).forEach(i -> segmentList.add(new 
ArrayList<>()));
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index b741728aad..f0cf9c6011 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -931,11 +931,8 @@ A sample Coordinator dynamic config JSON object is shown 
below:
   "mergeBytesLimit": 100000000,
   "mergeSegmentsLimit" : 1000,
   "maxSegmentsToMove": 5,
-  "useBatchedSegmentSampler": false,
-  "percentOfSegmentsToConsiderPerMove": 100,
   "replicantLifetime": 15,
   "replicationThrottleLimit": 10,
-  "emitBalancingStats": false,
   "killDataSourceWhitelist": ["wikipedia", "testDatasource"],
   "decommissioningNodes": ["localhost:8182", "localhost:8282"],
   "decommissioningMaxPercentOfMaxSegmentsToMove": 70,
@@ -953,19 +950,16 @@ Issuing a GET request at the same URL will return the 
spec that is currently in
 |`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to 
merge.|524288000L|
 |`mergeSegmentsLimit`|The maximum number of segments that can be in a single 
[append task](../ingestion/tasks.md).|100|
 |`maxSegmentsToMove`|The maximum number of segments that can be moved at any 
given time.|100|
-|`useBatchedSegmentSampler`|Deprecated. Boolean flag for whether or not we 
should use the Reservoir Sampling with a reservoir of size k instead of fixed 
size 1 to pick segments to move. This option can be enabled to speed up the 
sampling of segments to be balanced, especially if there is a large number of 
segments in the cluster or if there are too many segments to move.|true|
-|`percentOfSegmentsToConsiderPerMove`|Deprecated. This will eventually be 
phased out by the batched segment sampler. You can enable the batched segment 
sampler now by setting the dynamic Coordinator config, 
`useBatchedSegmentSampler`, to `true`. Note that if you choose to enable the 
batched segment sampler, `percentOfSegmentsToConsiderPerMove` will no longer 
have any effect on balancing. If `useBatchedSegmentSampler == false`, this 
config defines the percentage of the total number of seg [...]
 |`replicantLifetime`|The maximum number of Coordinator runs for a segment to 
be replicated before we start alerting.|15|
 |`replicationThrottleLimit`|The maximum number of segments that can be in the 
replication queue of a historical tier at any given time.|500|
 |`balancerComputeThreads`|Thread pool size for computing moving cost of 
segments in segment balancing. Consider increasing this if you have a lot of 
segments and moving segments starts to get stuck.|1|
-|`emitBalancingStats`|Boolean flag for whether or not we should emit balancing 
stats. This is an expensive operation.|false|
 |`killDataSourceWhitelist`|List of specific data sources for which kill tasks 
are sent if property `druid.coordinator.kill.on` is true. This can be a list of 
comma-separated data source names or a JSON array.|none|
 |`killPendingSegmentsSkipList`|List of data sources for which pendingSegments 
are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is 
true. This can be a list of comma-separated data sources or a JSON array.|none|
 |`maxSegmentsInNodeLoadingQueue`|The maximum number of segments allowed in the 
load queue of any given server. Use this parameter to load segments faster if, 
for example, the cluster contains slow-loading nodes or if there are too many 
segments to be replicated to a particular node (when faster loading is 
preferred to better segments distribution). The optimal value depends on the 
loading speed of segments, acceptable replication time and number of nodes. 
|500|
 |`useRoundRobinSegmentAssignment`|Boolean flag for whether segments should be 
assigned to historicals in a round robin fashion. When disabled, segment 
assignment is done using the chosen balancer strategy. When enabled, this can 
speed up segment assignments leaving balancing to move the segments to their 
optimal locations (based on the balancer strategy) lazily. |true|
 |`decommissioningNodes`| List of historical servers to 'decommission'. 
Coordinator will not assign new segments to 'decommissioning' servers,  and 
segments will be moved away from them to be placed on non-decommissioning 
servers at the maximum rate specified by 
`decommissioningMaxPercentOfMaxSegmentsToMove`.|none|
 |`decommissioningMaxPercentOfMaxSegmentsToMove`| Upper limit of segments the 
Coordinator can move from decommissioning servers to active non-decommissioning 
servers during a single run. This value is relative to the total maximum number 
of segments that can be moved at any given time based upon the value of 
`maxSegmentsToMove`.<br /><br />If 
`decommissioningMaxPercentOfMaxSegmentsToMove` is 0, the Coordinator does not 
move segments to decommissioning servers, effectively putting them in  [...]
-|`pauseCoordination`| Boolean flag for whether or not the coordinator should 
execute its various duties of coordinating the cluster. Setting this to true 
essentially pauses all coordination work while allowing the API to remain up. 
Duties that are paused include all classes that implement the `CoordinatorDuty` 
Interface. Such duties include: Segment balancing, Segment compaction, Emission 
of metrics controlled by the dynamic coordinator config `emitBalancingStats`, 
Submitting kill tasks  [...]
+|`pauseCoordination`| Boolean flag for whether or not the coordinator should 
execute its various duties of coordinating the cluster. Setting this to true 
essentially pauses all coordination work while allowing the API to remain up. 
Duties that are paused include all classes that implement the `CoordinatorDuty` 
Interface. Such duties include: Segment balancing, Segment compaction, 
Submitting kill tasks for unused segments (if enabled), Logging of used 
segments in the cluster, Marking of n [...]
 |`replicateAfterLoadTimeout`| Boolean flag for whether or not additional 
replication is needed for segments that have failed to load due to the expiry 
of `druid.coordinator.load.timeout`. If this is set to true, the coordinator 
will attempt to replicate the failed segment on a different historical server. 
This helps improve the segment availability if there are a few slow historicals 
in the cluster. However, the slow historical may still load the segment later 
and the coordinator may iss [...]
 |`maxNonPrimaryReplicantsToLoad`|This is the maximum number of non-primary 
segment replicants to load per Coordination run. This number can be set to put 
a hard upper limit on the number of replicants loaded. It is a tool that can 
help prevent long delays in new data being available for query after events 
that require many non-primary replicants to be loaded by the cluster; such as a 
Historical node disconnecting from the cluster. The default value essentially 
means there is no limit on  [...]
 
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 5bc292dfd4..9003d0cb24 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -325,8 +325,6 @@ These metrics are for the Druid Coordinator and are reset 
each time the Coordina
 |`metadata/kill/datasource/count`|Total number of datasource metadata that 
were automatically deleted from metadata store per each Coordinator kill 
datasource duty run (Note: datasource metadata only exists for datasource 
created from supervisor). This metric can help adjust 
`druid.coordinator.kill.datasource.durationToRetain` configuration based on 
whether more or less datasource metadata need to be deleted per cycle. Note 
that this metric is only emitted when `druid.coordinator.kill.da [...]
 |`init/serverview/time`|Time taken to initialize the coordinator server 
view.||Depends on the number of segments|
 
-If `emitBalancingStats` is set to `true` in the Coordinator [dynamic 
configuration](../configuration/index.md#dynamic-configuration), then [log 
entries](../configuration/logging.md) for class 
`org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics` will have 
extra information on balancing decisions.
-
 ## General Health
 
 ### Service Health
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index cd0e2b67e7..9a38029403 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -59,14 +59,9 @@ public class CoordinatorDynamicConfig
   private final long mergeBytesLimit;
   private final int mergeSegmentsLimit;
   private final int maxSegmentsToMove;
-  @Deprecated
-  private final double percentOfSegmentsToConsiderPerMove;
-  @Deprecated
-  private final boolean useBatchedSegmentSampler;
   private final int replicantLifetime;
   private final int replicationThrottleLimit;
   private final int balancerComputeThreads;
-  private final boolean emitBalancingStats;
   private final boolean useRoundRobinSegmentAssignment;
   private final boolean smartSegmentLoading;
 
@@ -127,12 +122,9 @@ public class CoordinatorDynamicConfig
       @JsonProperty("mergeBytesLimit") long mergeBytesLimit,
       @JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit,
       @JsonProperty("maxSegmentsToMove") int maxSegmentsToMove,
-      @Deprecated @JsonProperty("percentOfSegmentsToConsiderPerMove") 
@Nullable Double percentOfSegmentsToConsiderPerMove,
-      @Deprecated @JsonProperty("useBatchedSegmentSampler") Boolean 
useBatchedSegmentSampler,
       @JsonProperty("replicantLifetime") int replicantLifetime,
       @JsonProperty("replicationThrottleLimit") int replicationThrottleLimit,
       @JsonProperty("balancerComputeThreads") int balancerComputeThreads,
-      @JsonProperty("emitBalancingStats") boolean emitBalancingStats,
       // Type is Object here so that we can support both string and list as 
Coordinator console can not send array of
       // strings in the update request. See 
https://github.com/apache/druid/issues/3055.
       // Keeping the legacy 'killDataSourceWhitelist' property name for 
backward compatibility. When the project is
@@ -161,31 +153,9 @@ public class CoordinatorDynamicConfig
     this.maxSegmentsToMove = maxSegmentsToMove;
     this.smartSegmentLoading = Builder.valueOrDefault(smartSegmentLoading, 
Defaults.SMART_SEGMENT_LOADING);
 
-    if (percentOfSegmentsToConsiderPerMove == null) {
-      log.debug(
-          "percentOfSegmentsToConsiderPerMove was null! This is likely because 
your metastore does not "
-          + "reflect this configuration being added to Druid in a recent 
release. Druid is defaulting the value "
-          + "to the Druid default of %f. It is recommended that you re-submit 
your dynamic config with your "
-          + "desired value for percentOfSegmentsToConsideredPerMove",
-          Defaults.PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
-      );
-      percentOfSegmentsToConsiderPerMove = 
Defaults.PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE;
-    }
-    Preconditions.checkArgument(
-        percentOfSegmentsToConsiderPerMove > 0 && 
percentOfSegmentsToConsiderPerMove <= 100,
-        "'percentOfSegmentsToConsiderPerMove' should be between 1 and 100"
-    );
-    this.percentOfSegmentsToConsiderPerMove = 
percentOfSegmentsToConsiderPerMove;
-
-    this.useBatchedSegmentSampler = Builder.valueOrDefault(
-        useBatchedSegmentSampler,
-        Defaults.USE_BATCHED_SEGMENT_SAMPLER
-    );
-
     this.replicantLifetime = replicantLifetime;
     this.replicationThrottleLimit = replicationThrottleLimit;
     this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);
-    this.emitBalancingStats = emitBalancingStats;
     this.specificDataSourcesToKillUnusedSegmentsIn
         = parseJsonStringOrArray(specificDataSourcesToKillUnusedSegmentsIn);
     this.dataSourcesToNotKillStalePendingSegmentsIn
@@ -291,12 +261,6 @@ public class CoordinatorDynamicConfig
     return mergeBytesLimit;
   }
 
-  @JsonProperty
-  public boolean emitBalancingStats()
-  {
-    return emitBalancingStats;
-  }
-
   @JsonProperty
   public int getMergeSegmentsLimit()
   {
@@ -309,20 +273,6 @@ public class CoordinatorDynamicConfig
     return maxSegmentsToMove;
   }
 
-  @Deprecated
-  @JsonProperty
-  public double getPercentOfSegmentsToConsiderPerMove()
-  {
-    return percentOfSegmentsToConsiderPerMove;
-  }
-
-  @Deprecated
-  @JsonProperty
-  public boolean useBatchedSegmentSampler()
-  {
-    return useBatchedSegmentSampler;
-  }
-
   @JsonProperty
   public int getReplicantLifetime()
   {
@@ -452,12 +402,9 @@ public class CoordinatorDynamicConfig
            ", mergeBytesLimit=" + mergeBytesLimit +
            ", mergeSegmentsLimit=" + mergeSegmentsLimit +
            ", maxSegmentsToMove=" + maxSegmentsToMove +
-           ", percentOfSegmentsToConsiderPerMove=" + 
percentOfSegmentsToConsiderPerMove +
-           ", useBatchedSegmentSampler=" + useBatchedSegmentSampler +
            ", replicantLifetime=" + replicantLifetime +
            ", replicationThrottleLimit=" + replicationThrottleLimit +
            ", balancerComputeThreads=" + balancerComputeThreads +
-           ", emitBalancingStats=" + emitBalancingStats +
            ", specificDataSourcesToKillUnusedSegmentsIn=" + 
specificDataSourcesToKillUnusedSegmentsIn +
            ", dataSourcesToNotKillStalePendingSegmentsIn=" + 
dataSourcesToNotKillStalePendingSegmentsIn +
            ", maxSegmentsInNodeLoadingQueue=" + maxSegmentsInNodeLoadingQueue +
@@ -485,11 +432,8 @@ public class CoordinatorDynamicConfig
            && mergeBytesLimit == that.mergeBytesLimit
            && mergeSegmentsLimit == that.mergeSegmentsLimit
            && maxSegmentsToMove == that.maxSegmentsToMove
-           && percentOfSegmentsToConsiderPerMove == 
that.percentOfSegmentsToConsiderPerMove
            && decommissioningMaxPercentOfMaxSegmentsToMove == 
that.decommissioningMaxPercentOfMaxSegmentsToMove
-           && useBatchedSegmentSampler == that.useBatchedSegmentSampler
            && balancerComputeThreads == that.balancerComputeThreads
-           && emitBalancingStats == that.emitBalancingStats
            && replicantLifetime == that.replicantLifetime
            && replicationThrottleLimit == that.replicationThrottleLimit
            && replicateAfterLoadTimeout == that.replicateAfterLoadTimeout
@@ -515,12 +459,9 @@ public class CoordinatorDynamicConfig
         mergeBytesLimit,
         mergeSegmentsLimit,
         maxSegmentsToMove,
-        percentOfSegmentsToConsiderPerMove,
-        useBatchedSegmentSampler,
         replicantLifetime,
         replicationThrottleLimit,
         balancerComputeThreads,
-        emitBalancingStats,
         maxSegmentsInNodeLoadingQueue,
         specificDataSourcesToKillUnusedSegmentsIn,
         dataSourcesToNotKillStalePendingSegmentsIn,
@@ -543,12 +484,10 @@ public class CoordinatorDynamicConfig
     static final long MERGE_BYTES_LIMIT = 524_288_000L;
     static final int MERGE_SEGMENTS_LIMIT = 100;
     static final int MAX_SEGMENTS_TO_MOVE = 100;
-    static final double PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE = 100;
     static final int REPLICANT_LIFETIME = 15;
     static final int REPLICATION_THROTTLE_LIMIT = 500;
     static final int BALANCER_COMPUTE_THREADS = 1;
     static final boolean EMIT_BALANCING_STATS = false;
-    static final boolean USE_BATCHED_SEGMENT_SAMPLER = true;
     static final int MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 500;
     static final int DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
     static final boolean PAUSE_COORDINATION = false;
@@ -564,11 +503,8 @@ public class CoordinatorDynamicConfig
     private Long mergeBytesLimit;
     private Integer mergeSegmentsLimit;
     private Integer maxSegmentsToMove;
-    private Double percentOfSegmentsToConsiderPerMove;
-    private Boolean useBatchedSegmentSampler;
     private Integer replicantLifetime;
     private Integer replicationThrottleLimit;
-    private Boolean emitBalancingStats;
     private Integer balancerComputeThreads;
     private Object specificDataSourcesToKillUnusedSegmentsIn;
     private Object dataSourcesToNotKillStalePendingSegmentsIn;
@@ -592,13 +528,9 @@ public class CoordinatorDynamicConfig
         @JsonProperty("mergeBytesLimit") @Nullable Long mergeBytesLimit,
         @JsonProperty("mergeSegmentsLimit") @Nullable Integer 
mergeSegmentsLimit,
         @JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove,
-        @Deprecated @JsonProperty("percentOfSegmentsToConsiderPerMove")
-        @Nullable Double percentOfSegmentsToConsiderPerMove,
-        @Deprecated @JsonProperty("useBatchedSegmentSampler") Boolean 
useBatchedSegmentSampler,
         @JsonProperty("replicantLifetime") @Nullable Integer replicantLifetime,
         @JsonProperty("replicationThrottleLimit") @Nullable Integer 
replicationThrottleLimit,
         @JsonProperty("balancerComputeThreads") @Nullable Integer 
balancerComputeThreads,
-        @JsonProperty("emitBalancingStats") @Nullable Boolean 
emitBalancingStats,
         @JsonProperty("killDataSourceWhitelist") @Nullable Object 
specificDataSourcesToKillUnusedSegmentsIn,
         @JsonProperty("killPendingSegmentsSkipList") @Nullable Object 
dataSourcesToNotKillStalePendingSegmentsIn,
         @JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer 
maxSegmentsInNodeLoadingQueue,
@@ -617,12 +549,9 @@ public class CoordinatorDynamicConfig
       this.mergeBytesLimit = mergeBytesLimit;
       this.mergeSegmentsLimit = mergeSegmentsLimit;
       this.maxSegmentsToMove = maxSegmentsToMove;
-      this.percentOfSegmentsToConsiderPerMove = 
percentOfSegmentsToConsiderPerMove;
-      this.useBatchedSegmentSampler = useBatchedSegmentSampler;
       this.replicantLifetime = replicantLifetime;
       this.replicationThrottleLimit = replicationThrottleLimit;
       this.balancerComputeThreads = balancerComputeThreads;
-      this.emitBalancingStats = emitBalancingStats;
       this.specificDataSourcesToKillUnusedSegmentsIn = 
specificDataSourcesToKillUnusedSegmentsIn;
       this.dataSourcesToNotKillStalePendingSegmentsIn = 
dataSourcesToNotKillStalePendingSegmentsIn;
       this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
@@ -660,20 +589,6 @@ public class CoordinatorDynamicConfig
       return this;
     }
 
-    @Deprecated
-    public Builder withPercentOfSegmentsToConsiderPerMove(double 
percentOfSegmentsToConsiderPerMove)
-    {
-      this.percentOfSegmentsToConsiderPerMove = 
percentOfSegmentsToConsiderPerMove;
-      return this;
-    }
-
-    @Deprecated
-    public Builder withUseBatchedSegmentSampler(boolean 
useBatchedSegmentSampler)
-    {
-      this.useBatchedSegmentSampler = useBatchedSegmentSampler;
-      return this;
-    }
-
     public Builder withSmartSegmentLoading(boolean smartSegmentLoading)
     {
       this.smartSegmentLoading = smartSegmentLoading;
@@ -704,12 +619,6 @@ public class CoordinatorDynamicConfig
       return this;
     }
 
-    public Builder withEmitBalancingStats(boolean emitBalancingStats)
-    {
-      this.emitBalancingStats = emitBalancingStats;
-      return this;
-    }
-
     public Builder withSpecificDataSourcesToKillUnusedSegmentsIn(Set<String> 
dataSources)
     {
       this.specificDataSourcesToKillUnusedSegmentsIn = dataSources;
@@ -772,12 +681,9 @@ public class CoordinatorDynamicConfig
           valueOrDefault(mergeBytesLimit, Defaults.MERGE_BYTES_LIMIT),
           valueOrDefault(mergeSegmentsLimit, Defaults.MERGE_SEGMENTS_LIMIT),
           valueOrDefault(maxSegmentsToMove, Defaults.MAX_SEGMENTS_TO_MOVE),
-          valueOrDefault(percentOfSegmentsToConsiderPerMove, 
Defaults.PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE),
-          valueOrDefault(useBatchedSegmentSampler, 
Defaults.USE_BATCHED_SEGMENT_SAMPLER),
           valueOrDefault(replicantLifetime, Defaults.REPLICANT_LIFETIME),
           valueOrDefault(replicationThrottleLimit, 
Defaults.REPLICATION_THROTTLE_LIMIT),
           valueOrDefault(balancerComputeThreads, 
Defaults.BALANCER_COMPUTE_THREADS),
-          valueOrDefault(emitBalancingStats, Defaults.EMIT_BALANCING_STATS),
           specificDataSourcesToKillUnusedSegmentsIn,
           dataSourcesToNotKillStalePendingSegmentsIn,
           valueOrDefault(maxSegmentsInNodeLoadingQueue, 
Defaults.MAX_SEGMENTS_IN_NODE_LOADING_QUEUE),
@@ -810,12 +716,9 @@ public class CoordinatorDynamicConfig
           valueOrDefault(mergeBytesLimit, defaults.getMergeBytesLimit()),
           valueOrDefault(mergeSegmentsLimit, defaults.getMergeSegmentsLimit()),
           valueOrDefault(maxSegmentsToMove, defaults.getMaxSegmentsToMove()),
-          valueOrDefault(percentOfSegmentsToConsiderPerMove, 
defaults.getPercentOfSegmentsToConsiderPerMove()),
-          valueOrDefault(useBatchedSegmentSampler, 
defaults.useBatchedSegmentSampler()),
           valueOrDefault(replicantLifetime, defaults.getReplicantLifetime()),
           valueOrDefault(replicationThrottleLimit, 
defaults.getReplicationThrottleLimit()),
           valueOrDefault(balancerComputeThreads, 
defaults.getBalancerComputeThreads()),
-          valueOrDefault(emitBalancingStats, defaults.emitBalancingStats()),
           valueOrDefault(specificDataSourcesToKillUnusedSegmentsIn, 
defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()),
           valueOrDefault(dataSourcesToNotKillStalePendingSegmentsIn, 
defaults.getDataSourcesToNotKillStalePendingSegmentsIn()),
           valueOrDefault(maxSegmentsInNodeLoadingQueue, 
defaults.getMaxSegmentsInNodeLoadingQueue()),
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSampler.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSampler.java
index 474347215a..defa77a6b6 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSampler.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSampler.java
@@ -43,6 +43,8 @@ public final class ReservoirSegmentSampler
    *
    * @param serverHolders        Set of historicals to consider for picking 
segments
    * @param maxSegmentsToPick    Maximum number of segments to pick
+   * @param segmentProvider      Function to extract all movable segments from 
a
+   *                             {@link ServerHolder}.
    * @param broadcastDatasources Segments belonging to these datasources will 
not
    *                             be picked for balancing, since they should be
    *                             loaded on all servers anyway.
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
index fbf867fd91..bb42e95eaa 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
@@ -39,7 +39,6 @@ public class SegmentLoadingConfig
   private final int percentDecommSegmentsToMove;
 
   private final boolean useRoundRobinSegmentAssignment;
-  private final boolean emitBalancingStats;
 
   /**
    * Creates a new SegmentLoadingConfig with recomputed coordinator config 
values from
@@ -69,11 +68,10 @@ public class SegmentLoadingConfig
           0,
           replicationThrottleLimit,
           Integer.MAX_VALUE,
-          dynamicConfig.getReplicantLifetime(),
+          60,
           maxSegmentsToMove,
           100,
-          true,
-          false
+          true
       );
     } else {
       // Use the configured values
@@ -84,8 +82,7 @@ public class SegmentLoadingConfig
           dynamicConfig.getReplicantLifetime(),
           dynamicConfig.getMaxSegmentsToMove(),
           dynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove(),
-          dynamicConfig.isUseRoundRobinSegmentAssignment(),
-          dynamicConfig.emitBalancingStats()
+          dynamicConfig.isUseRoundRobinSegmentAssignment()
       );
     }
   }
@@ -97,8 +94,7 @@ public class SegmentLoadingConfig
       int maxLifetimeInLoadQueue,
       int maxSegmentsToMove,
       int percentDecommSegmentsToMove,
-      boolean useRoundRobinSegmentAssignment,
-      boolean emitBalancingStats
+      boolean useRoundRobinSegmentAssignment
   )
   {
     this.maxSegmentsInLoadQueue = maxSegmentsInLoadQueue;
@@ -108,7 +104,6 @@ public class SegmentLoadingConfig
     this.maxSegmentsToMove = maxSegmentsToMove;
     this.percentDecommSegmentsToMove = percentDecommSegmentsToMove;
     this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
-    this.emitBalancingStats = emitBalancingStats;
   }
 
   public int getMaxSegmentsInLoadQueue()
@@ -131,11 +126,6 @@ public class SegmentLoadingConfig
     return useRoundRobinSegmentAssignment;
   }
 
-  public boolean isEmitBalancingStats()
-  {
-    return emitBalancingStats;
-  }
-
   public int getMaxLifetimeInLoadQueue()
   {
     return maxLifetimeInLoadQueue;
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
index f8ea6cf98f..c67d84c681 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
@@ -89,7 +89,7 @@ public class CoordinatorRunStats
 
   /**
    * Builds a printable table of all the collected error, info and debug level
-   * stats (if applicable) with non-zero values.
+   * stats (if there are qualifying debugDimensions) with non-zero values.
    */
   public String buildStatsTable()
   {
@@ -183,13 +183,6 @@ public class CoordinatorRunStats
 
   public void add(CoordinatorStat stat, RowKey rowKey, long value)
   {
-    // Do not add a stat which will neither be emitted nor logged
-    if (!stat.shouldEmit()
-        && stat.getLevel() == CoordinatorStat.Level.DEBUG
-        && !hasDebugDimension(rowKey)) {
-      return;
-    }
-
     allStats.computeIfAbsent(rowKey, d -> new Object2LongOpenHashMap<>())
             .addTo(stat, value);
   }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorStat.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorStat.java
index a5d53f3735..cad6652ca1 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorStat.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorStat.java
@@ -29,27 +29,39 @@ public class CoordinatorStat
   private final Level level;
 
   /**
-   * Creates a new non-emitting, DEBUG level stat.
+   * Creates a new DEBUG level stat which is not emitted as a metric.
+   *
+   * @param shortName Unique name used while logging the stat
    */
-  public CoordinatorStat(String shortStatName)
+  public static CoordinatorStat toDebugOnly(String shortName)
   {
-    this(shortStatName, null, Level.DEBUG);
+    return new CoordinatorStat(shortName, null, Level.DEBUG);
   }
 
-  public CoordinatorStat(String shortName, Level level)
+  /**
+   * Creates a new DEBUG level stat which is also emitted as a metric.
+   *
+   * @param shortName  Unique name used while logging the stat
+   * @param metricName Name to be used when emitting this stat as a metric
+   */
+  public static CoordinatorStat toDebugAndEmit(String shortName, String 
metricName)
   {
-    this(shortName, null, level);
+    return new CoordinatorStat(shortName, metricName, Level.DEBUG);
   }
 
   /**
-   * Creates a new emitting, DEBUG level stat.
+   * Creates a new stat of the specified level, which is also emitted as a 
metric.
+   *
+   * @param shortName  Unique name used while logging the stat
+   * @param metricName Name to be used when emitting this stat as a metric
+   * @param level      Logging level for this stat
    */
-  public CoordinatorStat(String shortStatName, String metricName)
+  public static CoordinatorStat toLogAndEmit(String shortName, String 
metricName, Level level)
   {
-    this(shortStatName, metricName, Level.DEBUG);
+    return new CoordinatorStat(shortName, metricName, level);
   }
 
-  public CoordinatorStat(String shortStatName, String metricName, Level level)
+  private CoordinatorStat(String shortStatName, String metricName, Level level)
   {
     this.metricName = metricName;
     this.shortName = shortStatName;
@@ -57,18 +69,25 @@ public class CoordinatorStat
   }
 
   /**
-   * Name of the metric emitted for this stat, if any.
+   * @return Metric name to be used when emitting this stat. {@code null} if
+   * this stat should not be emitted.
    */
   public String getMetricName()
   {
     return metricName;
   }
 
+  /**
+   * Unique name used while logging this stat.
+   */
   public String getShortName()
   {
     return shortName;
   }
 
+  /**
+   * Level of this stat, typically used for logging.
+   */
   public Level getLevel()
   {
     return level;
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
index 9864aa6b3a..28f5c91049 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
@@ -28,114 +28,115 @@ public class Stats
   {
     // Decisions taken in a run
     public static final CoordinatorStat ASSIGNED
-        = new CoordinatorStat("assigned", "segment/assigned/count", 
CoordinatorStat.Level.INFO);
+        = CoordinatorStat.toLogAndEmit("assigned", "segment/assigned/count", 
CoordinatorStat.Level.INFO);
     public static final CoordinatorStat DROPPED
-        = new CoordinatorStat("dropped", "segment/dropped/count", 
CoordinatorStat.Level.INFO);
+        = CoordinatorStat.toLogAndEmit("dropped", "segment/dropped/count", 
CoordinatorStat.Level.INFO);
     public static final CoordinatorStat DELETED
-        = new CoordinatorStat("deleted", "segment/deleted/count", 
CoordinatorStat.Level.INFO);
+        = CoordinatorStat.toLogAndEmit("deleted", "segment/deleted/count", 
CoordinatorStat.Level.INFO);
     public static final CoordinatorStat MOVED
-        = new CoordinatorStat("moved", "segment/moved/count");
+        = CoordinatorStat.toDebugAndEmit("moved", "segment/moved/count");
 
     // Skipped decisions in a run
     public static final CoordinatorStat ASSIGN_SKIPPED
-        = new CoordinatorStat("assignSkip", "segment/assignSkipped/count");
+        = CoordinatorStat.toDebugAndEmit("assignSkip", 
"segment/assignSkipped/count");
     public static final CoordinatorStat DROP_SKIPPED
-        = new CoordinatorStat("dropSkip", "segment/dropSkipped/count");
+        = CoordinatorStat.toDebugAndEmit("dropSkip", 
"segment/dropSkipped/count");
     public static final CoordinatorStat MOVE_SKIPPED
-        = new CoordinatorStat("moveSkip", "segment/moveSkipped/count");
+        = CoordinatorStat.toDebugAndEmit("moveSkip", 
"segment/moveSkipped/count");
 
     // Current state of segments of a datasource
     public static final CoordinatorStat USED
-        = new CoordinatorStat("usedSegments", "segment/count");
+        = CoordinatorStat.toDebugAndEmit("usedSegments", "segment/count");
     public static final CoordinatorStat USED_BYTES
-        = new CoordinatorStat("usedSegmentBytes", "segment/size");
+        = CoordinatorStat.toDebugAndEmit("usedSegmentBytes", "segment/size");
     public static final CoordinatorStat UNDER_REPLICATED
-        = new CoordinatorStat("underreplicated", 
"segment/underReplicated/count");
+        = CoordinatorStat.toDebugAndEmit("underreplicated", 
"segment/underReplicated/count");
     public static final CoordinatorStat UNAVAILABLE
-        = new CoordinatorStat("unavailable", "segment/unavailable/count");
+        = CoordinatorStat.toDebugAndEmit("unavailable", 
"segment/unavailable/count");
     public static final CoordinatorStat UNNEEDED
-        = new CoordinatorStat("unneeded", "segment/unneeded/count");
+        = CoordinatorStat.toDebugAndEmit("unneeded", "segment/unneeded/count");
     public static final CoordinatorStat OVERSHADOWED
-        = new CoordinatorStat("overshadowed", "segment/overshadowed/count");
+        = CoordinatorStat.toDebugAndEmit("overshadowed", 
"segment/overshadowed/count");
   }
 
   public static class SegmentQueue
   {
     public static final CoordinatorStat NUM_TO_LOAD
-        = new CoordinatorStat("numToLoad", "segment/loadQueue/count");
+        = CoordinatorStat.toDebugAndEmit("numToLoad", 
"segment/loadQueue/count");
     public static final CoordinatorStat BYTES_TO_LOAD
-        = new CoordinatorStat("bytesToLoad", "segment/loadQueue/size");
+        = CoordinatorStat.toDebugAndEmit("bytesToLoad", 
"segment/loadQueue/size");
     public static final CoordinatorStat NUM_TO_DROP
-        = new CoordinatorStat("numToDrop", "segment/dropQueue/count");
+        = CoordinatorStat.toDebugAndEmit("numToDrop", 
"segment/dropQueue/count");
 
     public static final CoordinatorStat ASSIGNED_ACTIONS
-        = new CoordinatorStat("assignedActions", "segment/loadQueue/assigned");
+        = CoordinatorStat.toDebugAndEmit("assignedActions", 
"segment/loadQueue/assigned");
     public static final CoordinatorStat COMPLETED_ACTIONS
-        = new CoordinatorStat("successActions", "segment/loadQueue/success");
+        = CoordinatorStat.toDebugAndEmit("successActions", 
"segment/loadQueue/success");
     public static final CoordinatorStat FAILED_ACTIONS
-        = new CoordinatorStat("failedActions", "segment/loadQueue/failed", 
CoordinatorStat.Level.ERROR);
+        = CoordinatorStat.toLogAndEmit("failedActions", 
"segment/loadQueue/failed", CoordinatorStat.Level.ERROR);
     public static final CoordinatorStat CANCELLED_ACTIONS
-        = new CoordinatorStat("cancelledActions", 
"segment/loadQueue/cancelled");
+        = CoordinatorStat.toDebugAndEmit("cancelledActions", 
"segment/loadQueue/cancelled");
   }
 
   public static class Tier
   {
     public static final CoordinatorStat REQUIRED_CAPACITY
-        = new CoordinatorStat("reqdCap", "tier/required/capacity");
+        = CoordinatorStat.toDebugAndEmit("reqdCap", "tier/required/capacity");
     public static final CoordinatorStat TOTAL_CAPACITY
-        = new CoordinatorStat("totalCap", "tier/total/capacity");
+        = CoordinatorStat.toDebugAndEmit("totalCap", "tier/total/capacity");
     public static final CoordinatorStat REPLICATION_FACTOR
-        = new CoordinatorStat("maxRepFactor", "tier/replication/factor");
+        = CoordinatorStat.toDebugAndEmit("maxRepFactor", 
"tier/replication/factor");
     public static final CoordinatorStat HISTORICAL_COUNT
-        = new CoordinatorStat("numHistorical", "tier/historical/count");
+        = CoordinatorStat.toDebugAndEmit("numHistorical", 
"tier/historical/count");
   }
 
   public static class Compaction
   {
     public static final CoordinatorStat SUBMITTED_TASKS
-        = new CoordinatorStat("compactTasks", "compact/task/count");
+        = CoordinatorStat.toDebugAndEmit("compactTasks", "compact/task/count");
     public static final CoordinatorStat MAX_SLOTS
-        = new CoordinatorStat("compactMaxSlots", "compactTask/maxSlot/count");
+        = CoordinatorStat.toDebugAndEmit("compactMaxSlots", 
"compactTask/maxSlot/count");
     public static final CoordinatorStat AVAILABLE_SLOTS
-        = new CoordinatorStat("compactAvlSlots", 
"compactTask/availableSlot/count");
+        = CoordinatorStat.toDebugAndEmit("compactAvlSlots", 
"compactTask/availableSlot/count");
 
     public static final CoordinatorStat PENDING_BYTES
-        = new CoordinatorStat("compactPendingBytes", 
"segment/waitCompact/bytes");
+        = CoordinatorStat.toDebugAndEmit("compactPendingBytes", 
"segment/waitCompact/bytes");
     public static final CoordinatorStat COMPACTED_BYTES
-        = new CoordinatorStat("compactedBytes", "segment/compacted/bytes");
+        = CoordinatorStat.toDebugAndEmit("compactedBytes", 
"segment/compacted/bytes");
     public static final CoordinatorStat SKIPPED_BYTES
-        = new CoordinatorStat("compactSkipBytes", "segment/skipCompact/bytes");
+        = CoordinatorStat.toDebugAndEmit("compactSkipBytes", 
"segment/skipCompact/bytes");
 
     public static final CoordinatorStat PENDING_SEGMENTS
-        = new CoordinatorStat("compactPendingSeg", 
"segment/waitCompact/count");
+        = CoordinatorStat.toDebugAndEmit("compactPendingSeg", 
"segment/waitCompact/count");
     public static final CoordinatorStat COMPACTED_SEGMENTS
-        = new CoordinatorStat("compactedSeg", "segment/compacted/count");
+        = CoordinatorStat.toDebugAndEmit("compactedSeg", 
"segment/compacted/count");
     public static final CoordinatorStat SKIPPED_SEGMENTS
-        = new CoordinatorStat("compactSkipSeg", "segment/skipCompact/count");
+        = CoordinatorStat.toDebugAndEmit("compactSkipSeg", 
"segment/skipCompact/count");
 
     public static final CoordinatorStat PENDING_INTERVALS
-        = new CoordinatorStat("compactPendingIntv", 
"interval/waitCompact/count");
+        = CoordinatorStat.toDebugAndEmit("compactPendingIntv", 
"interval/waitCompact/count");
     public static final CoordinatorStat COMPACTED_INTERVALS
-        = new CoordinatorStat("compactedIntv", "interval/compacted/count");
+        = CoordinatorStat.toDebugAndEmit("compactedIntv", 
"interval/compacted/count");
     public static final CoordinatorStat SKIPPED_INTERVALS
-        = new CoordinatorStat("compactSkipIntv", "interval/skipCompact/count");
+        = CoordinatorStat.toDebugAndEmit("compactSkipIntv", 
"interval/skipCompact/count");
   }
 
   public static class CoordinatorRun
   {
     public static final CoordinatorStat DUTY_RUN_TIME
-        = new CoordinatorStat("dutyRunTime", "coordinator/time");
+        = CoordinatorStat.toDebugAndEmit("dutyRunTime", "coordinator/time");
     public static final CoordinatorStat GROUP_RUN_TIME
-        = new CoordinatorStat("groupRunTime", "coordinator/global/time");
+        = CoordinatorStat.toDebugAndEmit("groupRunTime", 
"coordinator/global/time");
   }
 
   public static class Balancer
   {
-    public static final CoordinatorStat COMPUTATION_ERRORS
-        = new CoordinatorStat("costComputeError", 
"segment/balancer/compute/error");
-    public static final CoordinatorStat COMPUTATION_TIME
-        = new CoordinatorStat("costComputeTime", 
"segment/balancer/compute/time");
-    public static final CoordinatorStat COMPUTATION_COUNT
-        = new CoordinatorStat("costComputeCount", 
"segment/balancer/compute/count");
+    public static final CoordinatorStat COMPUTATION_ERRORS = 
CoordinatorStat.toLogAndEmit(
+        "costComputeError",
+        "segment/balancer/compute/error",
+        CoordinatorStat.Level.ERROR
+    );
+    public static final CoordinatorStat COMPUTATION_TIME = 
CoordinatorStat.toDebugOnly("costComputeTime");
+    public static final CoordinatorStat COMPUTATION_COUNT = 
CoordinatorStat.toDebugOnly("costComputeCount");
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
index c2c77f02cb..1bf111a4f4 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
@@ -34,15 +34,6 @@ import java.util.Map;
 
 public class CoordinatorRunStatsTest
 {
-  private static final CoordinatorStat STAT_1 = new CoordinatorStat("stat1", 
"s1");
-  private static final CoordinatorStat STAT_2 = new CoordinatorStat("stat2", 
"s2");
-  private static final CoordinatorStat STAT_3 = new CoordinatorStat("stat3", 
"s3");
-
-  private static final CoordinatorStat DEBUG_STAT_1
-      = new CoordinatorStat("debugStat1", CoordinatorStat.Level.DEBUG);
-  private static final CoordinatorStat DEBUG_STAT_2
-      = new CoordinatorStat("debugStat2", CoordinatorStat.Level.DEBUG);
-
   private CoordinatorRunStats stats;
 
   @Before
@@ -60,56 +51,56 @@ public class CoordinatorRunStatsTest
   @Test
   public void testAdd()
   {
-    Assert.assertEquals(0, stats.get(STAT_1));
-    stats.add(STAT_1, 1);
-    Assert.assertEquals(1, stats.get(STAT_1));
-    stats.add(STAT_1, -11);
-    Assert.assertEquals(-10, stats.get(STAT_1));
+    Assert.assertEquals(0, stats.get(Stat.ERROR_1));
+    stats.add(Stat.ERROR_1, 1);
+    Assert.assertEquals(1, stats.get(Stat.ERROR_1));
+    stats.add(Stat.ERROR_1, -11);
+    Assert.assertEquals(-10, stats.get(Stat.ERROR_1));
   }
 
   @Test
   public void testAddForRowKey()
   {
-    stats.add(STAT_1, Key.TIER_1, 1);
-    stats.add(STAT_1, Key.TIER_2, 1);
-    stats.add(STAT_1, Key.TIER_1, -5);
-    stats.add(STAT_2, Key.TIER_1, 1);
-    stats.add(STAT_1, Key.TIER_2, 1);
+    stats.add(Stat.ERROR_1, Key.TIER_1, 1);
+    stats.add(Stat.ERROR_1, Key.TIER_2, 1);
+    stats.add(Stat.ERROR_1, Key.TIER_1, -5);
+    stats.add(Stat.INFO_1, Key.TIER_1, 1);
+    stats.add(Stat.ERROR_1, Key.TIER_2, 1);
 
-    Assert.assertFalse(stats.hasStat(STAT_3));
+    Assert.assertFalse(stats.hasStat(Stat.INFO_2));
 
-    Assert.assertEquals(-4, stats.get(STAT_1, Key.TIER_1));
-    Assert.assertEquals(2, stats.get(STAT_1, Key.TIER_2));
-    Assert.assertEquals(1, stats.get(STAT_2, Key.TIER_1));
+    Assert.assertEquals(-4, stats.get(Stat.ERROR_1, Key.TIER_1));
+    Assert.assertEquals(2, stats.get(Stat.ERROR_1, Key.TIER_2));
+    Assert.assertEquals(1, stats.get(Stat.INFO_1, Key.TIER_1));
   }
 
   @Test
   public void testGetSnapshotAndReset()
   {
-    stats.add(STAT_1, 1);
-    stats.add(STAT_2, 3);
-    stats.add(STAT_1, Key.TIER_1, 5);
-    stats.add(STAT_1, Key.DUTY_1, 7);
+    stats.add(Stat.ERROR_1, 1);
+    stats.add(Stat.INFO_1, 3);
+    stats.add(Stat.ERROR_1, Key.TIER_1, 5);
+    stats.add(Stat.ERROR_1, Key.DUTY_1, 7);
 
     final CoordinatorRunStats firstFlush = stats.getSnapshotAndReset();
-    Assert.assertEquals(1, firstFlush.get(STAT_1));
-    Assert.assertEquals(3, firstFlush.get(STAT_2));
-    Assert.assertEquals(5, firstFlush.get(STAT_1, Key.TIER_1));
-    Assert.assertEquals(7, firstFlush.get(STAT_1, Key.DUTY_1));
+    Assert.assertEquals(1, firstFlush.get(Stat.ERROR_1));
+    Assert.assertEquals(3, firstFlush.get(Stat.INFO_1));
+    Assert.assertEquals(5, firstFlush.get(Stat.ERROR_1, Key.TIER_1));
+    Assert.assertEquals(7, firstFlush.get(Stat.ERROR_1, Key.DUTY_1));
 
     Assert.assertEquals(0, stats.rowCount());
 
-    stats.add(STAT_1, 7);
-    stats.add(STAT_1, Key.TIER_1, 5);
-    stats.add(STAT_2, Key.DUTY_1, 3);
-    stats.add(STAT_3, Key.TIER_1, 1);
+    stats.add(Stat.ERROR_1, 7);
+    stats.add(Stat.ERROR_1, Key.TIER_1, 5);
+    stats.add(Stat.INFO_1, Key.DUTY_1, 3);
+    stats.add(Stat.INFO_2, Key.TIER_1, 1);
 
     final CoordinatorRunStats secondFlush = stats.getSnapshotAndReset();
 
-    Assert.assertEquals(7, secondFlush.get(STAT_1));
-    Assert.assertEquals(5, secondFlush.get(STAT_1, Key.TIER_1));
-    Assert.assertEquals(3, secondFlush.get(STAT_2, Key.DUTY_1));
-    Assert.assertEquals(1, secondFlush.get(STAT_3, Key.TIER_1));
+    Assert.assertEquals(7, secondFlush.get(Stat.ERROR_1));
+    Assert.assertEquals(5, secondFlush.get(Stat.ERROR_1, Key.TIER_1));
+    Assert.assertEquals(3, secondFlush.get(Stat.INFO_1, Key.DUTY_1));
+    Assert.assertEquals(1, secondFlush.get(Stat.INFO_2, Key.TIER_1));
 
     Assert.assertEquals(0, stats.rowCount());
   }
@@ -117,38 +108,38 @@ public class CoordinatorRunStatsTest
   @Test
   public void testUpdateMax()
   {
-    stats.updateMax(STAT_1, Key.TIER_1, 2);
-    stats.updateMax(STAT_1, Key.TIER_1, 6);
-    stats.updateMax(STAT_1, Key.TIER_1, 5);
+    stats.updateMax(Stat.ERROR_1, Key.TIER_1, 2);
+    stats.updateMax(Stat.ERROR_1, Key.TIER_1, 6);
+    stats.updateMax(Stat.ERROR_1, Key.TIER_1, 5);
 
-    stats.updateMax(STAT_2, Key.TIER_1, 5);
-    stats.updateMax(STAT_2, Key.TIER_1, 4);
-    stats.updateMax(STAT_2, Key.TIER_1, 5);
+    stats.updateMax(Stat.INFO_1, Key.TIER_1, 5);
+    stats.updateMax(Stat.INFO_1, Key.TIER_1, 4);
+    stats.updateMax(Stat.INFO_1, Key.TIER_1, 5);
 
-    stats.updateMax(STAT_1, Key.TIER_2, 7);
-    stats.updateMax(STAT_1, Key.TIER_2, 9);
-    stats.updateMax(STAT_1, Key.TIER_2, 10);
+    stats.updateMax(Stat.ERROR_1, Key.TIER_2, 7);
+    stats.updateMax(Stat.ERROR_1, Key.TIER_2, 9);
+    stats.updateMax(Stat.ERROR_1, Key.TIER_2, 10);
 
-    Assert.assertFalse(stats.hasStat(STAT_3));
+    Assert.assertFalse(stats.hasStat(Stat.INFO_2));
 
-    Assert.assertEquals(6, stats.get(STAT_1, Key.TIER_1));
-    Assert.assertEquals(5, stats.get(STAT_2, Key.TIER_1));
-    Assert.assertEquals(10, stats.get(STAT_1, Key.TIER_2));
+    Assert.assertEquals(6, stats.get(Stat.ERROR_1, Key.TIER_1));
+    Assert.assertEquals(5, stats.get(Stat.INFO_1, Key.TIER_1));
+    Assert.assertEquals(10, stats.get(Stat.ERROR_1, Key.TIER_2));
   }
 
   @Test
   public void testAddToDutyStat()
   {
-    stats.add(STAT_1, Key.DUTY_1, 1);
-    stats.add(STAT_1, Key.DUTY_2, 1);
-    stats.add(STAT_1, Key.DUTY_1, -5);
-    stats.add(STAT_2, Key.DUTY_1, 1);
-    stats.add(STAT_1, Key.DUTY_2, 1);
-
-    Assert.assertFalse(stats.hasStat(STAT_3));
-    Assert.assertEquals(-4, stats.get(STAT_1, Key.DUTY_1));
-    Assert.assertEquals(2, stats.get(STAT_1, Key.DUTY_2));
-    Assert.assertEquals(1, stats.get(STAT_2, Key.DUTY_1));
+    stats.add(Stat.ERROR_1, Key.DUTY_1, 1);
+    stats.add(Stat.ERROR_1, Key.DUTY_2, 1);
+    stats.add(Stat.ERROR_1, Key.DUTY_1, -5);
+    stats.add(Stat.INFO_1, Key.DUTY_1, 1);
+    stats.add(Stat.ERROR_1, Key.DUTY_2, 1);
+
+    Assert.assertFalse(stats.hasStat(Stat.INFO_2));
+    Assert.assertEquals(-4, stats.get(Stat.ERROR_1, Key.DUTY_1));
+    Assert.assertEquals(2, stats.get(Stat.ERROR_1, Key.DUTY_2));
+    Assert.assertEquals(1, stats.get(Stat.INFO_1, Key.DUTY_1));
   }
 
   @Test
@@ -161,13 +152,13 @@ public class CoordinatorRunStatsTest
     );
     expected.forEach(
         (duty, count) ->
-            stats.add(STAT_1, RowKey.of(Dimension.DUTY, duty), count)
+            stats.add(Stat.ERROR_1, RowKey.of(Dimension.DUTY, duty), count)
     );
 
     final Map<String, Long> actual = new HashMap<>();
     stats.forEachStat(
         (stat, rowKey, value) -> {
-          if (stat.equals(STAT_1)) {
+          if (stat.equals(Stat.ERROR_1)) {
             actual.put(rowKey.getValues().get(Dimension.DUTY), value);
           }
         }
@@ -176,24 +167,36 @@ public class CoordinatorRunStatsTest
   }
 
   @Test
-  public void testAddWithDebugDimensions()
+  public void testBuildStatsTable()
   {
-    stats.add(DEBUG_STAT_1, 1);
-    Assert.assertFalse(stats.hasStat(DEBUG_STAT_1));
+    stats.add(Stat.ERROR_1, Key.DUTY_1, 10);
+    stats.add(Stat.INFO_1, Key.DUTY_1, 20);
+    stats.add(Stat.DEBUG_1, Key.DUTY_1, 30);
 
-    stats.add(DEBUG_STAT_1, Key.TIER_1, 1);
-    Assert.assertFalse(stats.hasStat(DEBUG_STAT_1));
+    final String expectedTable
+        = "\nError: {duty=duty1} ==> {error1=10}"
+          + "\nInfo : {duty=duty1} ==> {info1=20}"
+          + "\nDebug: 1 hidden stats. Set 'debugDimensions' to see these."
+          + "\nTOTAL: 3 stats for 1 dimension keys";
 
-    final CoordinatorRunStats debugStats
-        = new CoordinatorRunStats(Key.TIER_1.getValues());
-    debugStats.add(DEBUG_STAT_1, 1);
-    Assert.assertFalse(stats.hasStat(DEBUG_STAT_1));
-
-    debugStats.add(DEBUG_STAT_1, Key.TIER_1, 1);
-    Assert.assertTrue(debugStats.hasStat(DEBUG_STAT_1));
+    Assert.assertEquals(expectedTable, stats.buildStatsTable());
+  }
 
-    debugStats.add(DEBUG_STAT_2, RowKey.of(Dimension.DATASOURCE, "wiki"), 1);
-    Assert.assertFalse(debugStats.hasStat(DEBUG_STAT_2));
+  @Test
+  public void testBuildStatsTableWithDebugDimensions()
+  {
+    final CoordinatorRunStats debugStats = new 
CoordinatorRunStats(Key.DUTY_1.getValues());
+    debugStats.add(Stat.ERROR_1, Key.DUTY_1, 10);
+    debugStats.add(Stat.INFO_1, Key.DUTY_1, 20);
+    debugStats.add(Stat.DEBUG_1, Key.DUTY_1, 30);
+
+    final String expectedTable
+        = "\nError: {duty=duty1} ==> {error1=10}"
+          + "\nInfo : {duty=duty1} ==> {info1=20}"
+          + "\nDebug: {duty=duty1} ==> {debug1=30}"
+          + "\nTOTAL: 3 stats for 1 dimension keys";
+
+    Assert.assertEquals(expectedTable, debugStats.buildStatsTable());
   }
 
   /**
@@ -208,4 +211,15 @@ public class CoordinatorRunStatsTest
     static final RowKey DUTY_2 = RowKey.of(Dimension.DUTY, "duty2");
   }
 
+  private static class Stat
+  {
+    static final CoordinatorStat ERROR_1
+        = CoordinatorStat.toLogAndEmit("error1", "e1", 
CoordinatorStat.Level.ERROR);
+    static final CoordinatorStat INFO_1
+        = CoordinatorStat.toLogAndEmit("info1", "i1", 
CoordinatorStat.Level.INFO);
+    static final CoordinatorStat INFO_2
+        = CoordinatorStat.toLogAndEmit("info2", "i2", 
CoordinatorStat.Level.INFO);
+    static final CoordinatorStat DEBUG_1
+        = CoordinatorStat.toDebugAndEmit("debug1", "d1");
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
index 39dcc9ce50..185f8e8263 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
@@ -339,8 +339,6 @@ public class BalanceSegmentsTest
                 CoordinatorDynamicConfig.builder()
                                         .withSmartSegmentLoading(false)
                                         .withMaxSegmentsToMove(1)
-                                        .withUseBatchedSegmentSampler(true)
-                                        
.withPercentOfSegmentsToConsiderPerMove(40)
                                         .build()
             )
             .withBalancerStrategy(balancerStrategy)
@@ -355,7 +353,7 @@ public class BalanceSegmentsTest
   }
 
   @Test
-  public void testUseBatchedSegmentSampler()
+  public void testMoveForMultipleDatasources()
   {
     DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
         createHolder(server1, allSegments),
@@ -367,7 +365,6 @@ public class BalanceSegmentsTest
             CoordinatorDynamicConfig.builder()
                                     .withSmartSegmentLoading(false)
                                     .withMaxSegmentsToMove(2)
-                                    .withUseBatchedSegmentSampler(true)
                                     .build()
         )
         .withBroadcastDatasources(broadcastDatasources)
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
index e64ab0bcc4..c9343da85a 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
@@ -46,11 +46,9 @@ import java.util.Map;
  * the simulation. {@link CoordinatorSimulation#stop()} should not be called as
  * the simulation is stopped when cleaning up after the test in {@link 
#tearDown()}.
  * <p>
- * Tests that verify balancing behaviour should set
- * {@link CoordinatorDynamicConfig#useBatchedSegmentSampler()} to true.
+ * Tests that verify balancing behaviour use batched segment sampling.
  * Otherwise, the segment sampling is random and can produce repeated values
- * leading to flakiness in the tests. The simulation sets this field to true by
- * default.
+ * leading to flakiness in the tests.
  */
 public abstract class CoordinatorSimulationBaseTest implements
     CoordinatorSimulation.CoordinatorState,
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index bd9be366ae..43f7bd9872 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -151,11 +151,9 @@ public class CoordinatorSimulationBuilder
   /**
    * Specifies the CoordinatorDynamicConfig to be used in the simulation.
    * <p>
-   * Default values: {@code useBatchedSegmentSampler = true}, other params as
-   * specified in {@link CoordinatorDynamicConfig.Builder}.
+   * Default values: as specified in {@link CoordinatorDynamicConfig.Builder}.
    * <p>
-   * Tests that verify balancing behaviour must set
-   * {@link CoordinatorDynamicConfig#useBatchedSegmentSampler()} to true.
+   * Tests that verify balancing behaviour use batched segment sampling.
    * Otherwise, the segment sampling is random and can produce repeated values
    * leading to flakiness in the tests. The simulation sets this field to true 
by
    * default.
diff --git 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
index 2b81a5c918..a7744256d5 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
@@ -46,11 +46,9 @@ public class CoordinatorDynamicConfigTest
                      + "  \"mergeBytesLimit\": 1,\n"
                      + "  \"mergeSegmentsLimit\" : 1,\n"
                      + "  \"maxSegmentsToMove\": 1,\n"
-                     + "  \"percentOfSegmentsToConsiderPerMove\": 1,\n"
                      + "  \"replicantLifetime\": 1,\n"
                      + "  \"replicationThrottleLimit\": 1,\n"
                      + "  \"balancerComputeThreads\": 2, \n"
-                     + "  \"emitBalancingStats\": true,\n"
                      + "  \"killDataSourceWhitelist\": 
[\"test1\",\"test2\"],\n"
                      + "  \"maxSegmentsInNodeLoadingQueue\": 1,\n"
                      + "  \"decommissioningNodes\": [\"host1\", \"host2\"],\n"
@@ -78,11 +76,8 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
-        true,
-        1,
         1,
         2,
-        true,
         whitelist,
         false,
         1,
@@ -101,11 +96,8 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
-        true,
-        1,
         1,
         2,
-        true,
         whitelist,
         false,
         1,
@@ -124,11 +116,8 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
-        true,
-        1,
         1,
         2,
-        true,
         whitelist,
         false,
         1,
@@ -147,37 +136,8 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
-        true,
-        1,
         1,
         2,
-        true,
-        whitelist,
-        false,
-        1,
-        ImmutableSet.of("host1"),
-        5,
-        true,
-        false,
-        Integer.MAX_VALUE
-    );
-
-    actual = CoordinatorDynamicConfig.builder()
-                                     
.withPercentOfSegmentsToConsiderPerMove(10)
-                                     .withUseBatchedSegmentSampler(false)
-                                     .build(actual);
-    assertConfig(
-        actual,
-        1,
-        1,
-        1,
-        1,
-        10,
-        false,
-        1,
-        1,
-        2,
-        true,
         whitelist,
         false,
         1,
@@ -195,12 +155,9 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
-        10,
-        false,
         1,
         1,
         2,
-        true,
         whitelist,
         false,
         1,
@@ -218,12 +175,9 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
-        10,
-        false,
         1,
         1,
         2,
-        true,
         whitelist,
         false,
         1,
@@ -235,6 +189,19 @@ public class CoordinatorDynamicConfigTest
     );
   }
 
+  @Test
+  public void testDeserializationWithUnknownProperties() throws Exception
+  {
+    String jsonStr = "{\n"
+                     + "  \"unknownProperty\": 2, \n"
+                     + "  \"maxSegmentsInNodeLoadingQueue\": 15\n"
+                     + "}\n";
+
+    CoordinatorDynamicConfig dynamicConfig
+        = mapper.readValue(jsonStr, CoordinatorDynamicConfig.class);
+    Assert.assertEquals(15, dynamicConfig.getMaxSegmentsInNodeLoadingQueue());
+  }
+
   @Test
   public void 
testConstructorWithNullsShouldKillUnusedSegmentsInAllDataSources()
   {
@@ -243,12 +210,9 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
-        null,
-        false,
         1,
         2,
         10,
-        true,
         null,
         null,
         null,
@@ -273,12 +237,9 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
-        null,
-        false,
         1,
         2,
         10,
-        true,
         ImmutableSet.of("test1"),
         null,
         null,
@@ -306,7 +267,6 @@ public class CoordinatorDynamicConfigTest
                      + "  \"replicantLifetime\": 1,\n"
                      + "  \"replicationThrottleLimit\": 1,\n"
                      + "  \"balancerComputeThreads\": 2, \n"
-                     + "  \"emitBalancingStats\": true,\n"
                      + "  \"killDataSourceWhitelist\": 
[\"test1\",\"test2\"],\n"
                      + "  \"maxSegmentsInNodeLoadingQueue\": 1\n"
                      + "}\n";
@@ -328,11 +288,9 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
-        100,
-        true, 1,
+        1,
         1,
         2,
-        true,
         whitelist,
         false,
         1,
@@ -350,12 +308,9 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
-        100,
-        true,
         1,
         1,
         2,
-        true,
         whitelist,
         false,
         1,
@@ -373,12 +328,9 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
-        100,
-        true,
         1,
         1,
         2,
-        true,
         whitelist,
         false,
         1,
@@ -391,18 +343,16 @@ public class CoordinatorDynamicConfigTest
   }
 
   @Test
-  public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception
+  public void testSerdeWithStringInKillDataSourceWhitelist() throws Exception
   {
     String jsonStr = "{\n"
                      + "  \"millisToWaitBeforeDeleting\": 1,\n"
                      + "  \"mergeBytesLimit\": 1,\n"
                      + "  \"mergeSegmentsLimit\" : 1,\n"
                      + "  \"maxSegmentsToMove\": 1,\n"
-                     + "  \"percentOfSegmentsToConsiderPerMove\": 1,\n"
                      + "  \"replicantLifetime\": 1,\n"
                      + "  \"replicationThrottleLimit\": 1,\n"
                      + "  \"balancerComputeThreads\": 2, \n"
-                     + "  \"emitBalancingStats\": true,\n"
                      + "  \"killDataSourceWhitelist\": \"test1, test2\", \n"
                      + "  \"maxSegmentsInNodeLoadingQueue\": 1\n"
                      + "}\n";
@@ -423,11 +373,8 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
-        true,
-        1,
         1,
         2,
-        true,
         ImmutableSet.of("test1", "test2"),
         false,
         1,
@@ -440,19 +387,15 @@ public class CoordinatorDynamicConfigTest
   }
 
   @Test
-  public void testSerdeHandleInvalidPercentOfSegmentsToConsiderPerMove()
+  public void testSerdeHandlesInvalidDecommissioningPercentToMove()
   {
-    final String errorMsg = "'percentOfSegmentsToConsiderPerMove' should be 
between 1 and 100";
+    final String errorMsg = "'decommissioningMaxPercentOfMaxSegmentsToMove' 
should be in range [0, 100]";
     assertThatDeserializationFailsWithMessage(
-        "{\"percentOfSegmentsToConsiderPerMove\": 0}",
+        "{\"decommissioningMaxPercentOfMaxSegmentsToMove\": -1}",
         errorMsg
     );
     assertThatDeserializationFailsWithMessage(
-        "{\"percentOfSegmentsToConsiderPerMove\": -100}",
-        errorMsg
-    );
-    assertThatDeserializationFailsWithMessage(
-        "{\"percentOfSegmentsToConsiderPerMove\": 105}",
+        "{\"decommissioningMaxPercentOfMaxSegmentsToMove\": 105}",
         errorMsg
     );
   }
@@ -468,7 +411,6 @@ public class CoordinatorDynamicConfigTest
                      + "  \"replicantLifetime\": 1,\n"
                      + "  \"replicationThrottleLimit\": 1,\n"
                      + "  \"balancerComputeThreads\": 2, \n"
-                     + "  \"emitBalancingStats\": true,\n"
                      + "  \"killDataSourceWhitelist\": 
[\"test1\",\"test2\"],\n"
                      + "  \"maxSegmentsInNodeLoadingQueue\": 1,\n"
                      + "  \"decommissioningNodes\": [\"host1\", \"host2\"],\n"
@@ -489,12 +431,9 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
-        100,
-        true,
         1,
         1,
         2,
-        true,
         whitelist,
         false,
         1,
@@ -514,11 +453,9 @@ public class CoordinatorDynamicConfigTest
                      + "  \"mergeBytesLimit\": 1,\n"
                      + "  \"mergeSegmentsLimit\" : 1,\n"
                      + "  \"maxSegmentsToMove\": 1,\n"
-                     + "  \"percentOfSegmentsToConsiderPerMove\": 1,\n"
                      + "  \"replicantLifetime\": 1,\n"
                      + "  \"replicationThrottleLimit\": 1,\n"
                      + "  \"balancerComputeThreads\": 2, \n"
-                     + "  \"emitBalancingStats\": true,\n"
                      + "  \"killAllDataSources\": true,\n"
                      + "  \"maxSegmentsInNodeLoadingQueue\": 1\n"
                      + "}\n";
@@ -537,11 +474,8 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
-        true,
-        1,
         1,
         2,
-        true,
         ImmutableSet.of(),
         true,
         1,
@@ -557,8 +491,7 @@ public class CoordinatorDynamicConfigTest
     // so this is a valid config
     jsonStr = "{\n"
               + "  \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n"
-              + "  \"killAllDataSources\": true,\n"
-              + "  \"percentOfSegmentsToConsiderPerMove\": 1\n"
+              + "  \"killAllDataSources\": true\n"
               + "}\n";
     actual = mapper.readValue(jsonStr, CoordinatorDynamicConfig.class);
 
@@ -574,11 +507,9 @@ public class CoordinatorDynamicConfigTest
                      + "  \"mergeBytesLimit\": 1,\n"
                      + "  \"mergeSegmentsLimit\" : 1,\n"
                      + "  \"maxSegmentsToMove\": 1,\n"
-                     + "  \"percentOfSegmentsToConsiderPerMove\": 1,\n"
                      + "  \"replicantLifetime\": 1,\n"
                      + "  \"replicationThrottleLimit\": 1,\n"
                      + "  \"balancerComputeThreads\": 2, \n"
-                     + "  \"emitBalancingStats\": true,\n"
                      + "  \"killAllDataSources\": true\n"
                      + "}\n";
 
@@ -596,11 +527,8 @@ public class CoordinatorDynamicConfigTest
         1,
         1,
         1,
-        true,
-        1,
         1,
         2,
-        true,
         ImmutableSet.of(),
         true,
         EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
@@ -623,12 +551,9 @@ public class CoordinatorDynamicConfigTest
         524288000,
         100,
         100,
-        100,
-        true,
         15,
         500,
         1,
-        false,
         emptyList,
         true,
         EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
@@ -654,12 +579,9 @@ public class CoordinatorDynamicConfigTest
         524288000,
         100,
         100,
-        100,
-        true,
         15,
         500,
         1,
-        false,
         ImmutableSet.of("DATASOURCE"),
         false,
         EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
@@ -699,9 +621,6 @@ public class CoordinatorDynamicConfigTest
             null,
             null,
             null,
-            null,
-            null,
-            null,
             null
         ).build(current)
     );
@@ -733,7 +652,7 @@ public class CoordinatorDynamicConfigTest
   }
 
   @Test
-  public void testEqualsAndHashCodeSanity()
+  public void testEqualsAndHashCode()
   {
     CoordinatorDynamicConfig config1 = 
CoordinatorDynamicConfig.builder().build();
     CoordinatorDynamicConfig config2 = 
CoordinatorDynamicConfig.builder().build();
@@ -747,12 +666,9 @@ public class CoordinatorDynamicConfigTest
       long expectedMergeBytesLimit,
       int expectedMergeSegmentsLimit,
       int expectedMaxSegmentsToMove,
-      int expectedPercentOfSegmentsToConsiderPerMove,
-      boolean expectedUseBatchedSegmentSampler,
       int expectedReplicantLifetime,
       int expectedReplicationThrottleLimit,
       int expectedBalancerComputeThreads,
-      boolean expectedEmitingBalancingStats,
       Set<String> expectedSpecificDataSourcesToKillUnusedSegmentsIn,
       boolean expectedKillUnusedSegmentsInAllDataSources,
       int expectedMaxSegmentsInNodeLoadingQueue,
@@ -770,12 +686,9 @@ public class CoordinatorDynamicConfigTest
     Assert.assertEquals(expectedMergeBytesLimit, config.getMergeBytesLimit());
     Assert.assertEquals(expectedMergeSegmentsLimit, 
config.getMergeSegmentsLimit());
     Assert.assertEquals(expectedMaxSegmentsToMove, 
config.getMaxSegmentsToMove());
-    Assert.assertEquals(expectedPercentOfSegmentsToConsiderPerMove, 
config.getPercentOfSegmentsToConsiderPerMove(), 0);
-    Assert.assertEquals(expectedUseBatchedSegmentSampler, 
config.useBatchedSegmentSampler());
     Assert.assertEquals(expectedReplicantLifetime, 
config.getReplicantLifetime());
     Assert.assertEquals(expectedReplicationThrottleLimit, 
config.getReplicationThrottleLimit());
     Assert.assertEquals(expectedBalancerComputeThreads, 
config.getBalancerComputeThreads());
-    Assert.assertEquals(expectedEmitingBalancingStats, 
config.emitBalancingStats());
     Assert.assertEquals(
         expectedSpecificDataSourcesToKillUnusedSegmentsIn,
         config.getSpecificDataSourcesToKillUnusedSegmentsIn()
diff --git 
a/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
 
b/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
index f1d460a341..f8ca99c221 100644
--- 
a/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
+++ 
b/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
@@ -24,7 +24,6 @@ import type { Field } from '../../components';
 export interface CoordinatorDynamicConfig {
   maxSegmentsToMove?: number;
   balancerComputeThreads?: number;
-  emitBalancingStats?: boolean;
   killAllDataSources?: boolean;
   killDataSourceWhitelist?: string[];
   killPendingSegmentsSkipList?: string[];
@@ -58,17 +57,6 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: 
Field<CoordinatorDynamicConfig>[
       </>
     ),
   },
-  {
-    name: 'emitBalancingStats',
-    type: 'boolean',
-    defaultValue: false,
-    info: (
-      <>
-        Boolean flag for whether or not we should emit balancing stats. This 
is an expensive
-        operation.
-      </>
-    ),
-  },
   {
     name: 'killDataSourceWhitelist',
     label: 'Kill datasource whitelist',
@@ -183,19 +171,6 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: 
Field<CoordinatorDynamicConfig>[
       </>
     ),
   },
-  {
-    name: 'useBatchedSegmentSampler',
-    type: 'boolean',
-    defaultValue: true,
-    info: (
-      <>
-        Boolean flag for whether or not we should use the Reservoir Sampling 
with a reservoir of
-        size k instead of fixed size 1 to pick segments to move. This option 
can be enabled to speed
-        up segment balancing process, especially if there are huge number of 
segments in the cluster
-        or if there are too many segments to move.
-      </>
-    ),
-  },
   {
     name: 'useRoundRobinSegmentAssignment',
     type: 'boolean',
@@ -208,28 +183,6 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: 
Field<CoordinatorDynamicConfig>[
       </>
     ),
   },
-  {
-    name: 'percentOfSegmentsToConsiderPerMove',
-    type: 'number',
-    defaultValue: 100,
-    info: (
-      <>
-        Deprecated. This will eventually be phased out by the batched segment 
sampler. You can
-        enable the batched segment sampler now by setting the dynamic 
Coordinator config,
-        useBatchedSegmentSampler, to true. Note that if you choose to enable 
the batched segment
-        sampler, percentOfSegmentsToConsiderPerMove will no longer have any 
effect on balancing. If
-        useBatchedSegmentSampler == false, this config defines the percentage 
of the total number of
-        segments in the cluster that are considered every time a segment needs 
to be selected for a
-        move. Druid orders servers by available capacity ascending (the least 
available capacity
-        first) and then iterates over the servers. For each server, Druid 
iterates over the segments
-        on the server, considering them for moving. The default config of 100% 
means that every
-        segment on every server is a candidate to be moved. This should make 
sense for most small to
-        medium-sized clusters. However, an admin may find it preferable to 
drop this value lower if
-        they don&apos;t think that it is worthwhile to consider every single 
segment in the cluster
-        each time it is looking for a segment to move.
-      </>
-    ),
-  },
   {
     name: 'pauseCoordination',
     type: 'boolean',


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to