This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d4ac64178 Adapt maxSegmentsToMove based on cluster skew (#14584)
5d4ac64178 is described below

commit 5d4ac641783a359bc9ca11d20d25ad99310e0fc5
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Aug 17 11:14:54 2023 +0530

    Adapt maxSegmentsToMove based on cluster skew (#14584)
    
    Changes:
    - No change in behaviour if `smartSegmentLoading` is disabled
    - If `smartSegmentLoading` is enabled
      - Compute `balancerComputeThreads` based on `numUsedSegments`
      - Compute `maxSegmentsToMove` based on `balancerComputeThreads`
      - Compute `segmentsToMoveToFixSkew` based on usage skew
      - Compute `segmentsToMove = Math.min(maxSegmentsToMove, 
segmentsToMoveToFixSkew)`
    
    Limits:
    - 1 <= `balancerComputeThreads` <= 8
    - `maxSegmentsToMove` <= 20% of total segments
    - `minSegmentsToMove` = 0.15% of total segments
---
 .../druid/server/coordinator/DruidCoordinator.java |  46 +--
 .../coordinator/SegmentCountsPerInterval.java      |  25 +-
 .../druid/server/coordinator/ServerHolder.java     |  24 +-
 .../coordinator/balancer/BalancerStrategy.java     |   5 +-
 .../coordinator/balancer/CostBalancerStrategy.java |  14 +-
 .../balancer/RandomBalancerStrategy.java           |   2 +-
 .../balancer/SegmentToMoveCalculator.java          | 307 +++++++++++++++++++++
 .../coordinator/balancer/TierSegmentBalancer.java  | 116 ++++----
 .../server/coordinator/duty/BalanceSegments.java   |  74 ++++-
 .../duty/CollectSegmentAndServerStats.java         |   4 +-
 .../coordinator/duty/UnloadUnusedSegments.java     |   2 +-
 .../coordinator/loading/SegmentLoadingConfig.java  |  74 ++---
 .../coordinator/stats/CoordinatorRunStats.java     |   2 +
 .../coordinator/BalanceSegmentsProfiler.java       |   5 +-
 .../server/coordinator/DruidCoordinatorTest.java   |  26 +-
 .../druid/server/coordinator/ServerHolderTest.java |   2 -
 .../balancer/CostBalancerStrategyTest.java         |  17 +-
 .../balancer/SegmentToMoveCalculatorTest.java      | 261 ++++++++++++++++++
 .../coordinator/duty/BalanceSegmentsTest.java      |   3 +-
 .../loading/SegmentLoadingConfigTest.java          |  51 ++++
 .../simulate/CoordinatorSimulationBaseTest.java    |   2 +-
 .../simulate/CoordinatorSimulationBuilder.java     |   5 +-
 .../coordinator/simulate/SegmentBalancingTest.java |  37 +++
 23 files changed, 940 insertions(+), 164 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 09d8e2b687..fd017dd5ed 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -542,26 +542,37 @@ public class DruidCoordinator
     }
   }
 
+  /**
+   * Resets the balancerExec if required and creates a new BalancerStrategy for
+   * the current coordinator run.
+   */
   @VisibleForTesting
-  protected void initBalancerExecutor()
+  BalancerStrategy createBalancerStrategy(int balancerComputeThreads)
   {
-    final int currentNumber = getDynamicConfigs().getBalancerComputeThreads();
-
+    // Reset balancerExecutor if required
     if (balancerExec == null) {
-      balancerExec = createNewBalancerExecutor(currentNumber);
-    } else if (cachedBalancerThreadNumber != currentNumber) {
+      balancerExec = createNewBalancerExecutor(balancerComputeThreads);
+    } else if (cachedBalancerThreadNumber != balancerComputeThreads) {
       log.info(
-          "balancerComputeThreads has changed from [%d] to [%d], recreating 
the thread pool.",
-          cachedBalancerThreadNumber,
-          currentNumber
+          "'balancerComputeThreads' has changed from [%d] to [%d]",
+          cachedBalancerThreadNumber, balancerComputeThreads
       );
       balancerExec.shutdownNow();
-      balancerExec = createNewBalancerExecutor(currentNumber);
+      balancerExec = createNewBalancerExecutor(balancerComputeThreads);
     }
+
+    // Create BalancerStrategy
+    final BalancerStrategy balancerStrategy = 
balancerStrategyFactory.createBalancerStrategy(balancerExec);
+    log.info(
+        "Using balancer strategy[%s] with [%d] threads.",
+        balancerStrategy.getClass().getSimpleName(), balancerComputeThreads
+    );
+    return balancerStrategy;
   }
 
   private ListeningExecutorService createNewBalancerExecutor(int numThreads)
   {
+    log.info("Creating new balancer executor with [%d] threads.", numThreads);
     cachedBalancerThreadNumber = numThreads;
     return MoreExecutors.listeningDecorator(
         Execs.multiThreaded(numThreads, "coordinator-cost-balancer-%s")
@@ -576,7 +587,7 @@ public class DruidCoordinator
         new UpdateReplicationStatus(),
         new UnloadUnusedSegments(loadQueueManager),
         new 
MarkOvershadowedSegmentsAsUnused(segmentsMetadataManager::markSegmentsAsUnused),
-        new BalanceSegments(),
+        new BalanceSegments(config.getCoordinatorPeriod()),
         new CollectSegmentAndServerStats(DruidCoordinator.this)
     );
   }
@@ -750,7 +761,7 @@ public class DruidCoordinator
           );
 
           log.info(
-              "Emitted [%d] stats for group [%s]. All collected stats:%s\n",
+              "Emitted [%d] stats for group [%s]. All collected stats:%s",
               emittedCount.get(), dutyGroupName, allStats.buildStatsTable()
           );
         }
@@ -758,7 +769,7 @@ public class DruidCoordinator
         // Emit the runtime of the full DutiesRunnable
         final long runMillis = 
groupRunTime.stop().elapsed(TimeUnit.MILLISECONDS);
         emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, Collections.emptyMap(), 
runMillis);
-        log.info("Finished coordinator run for group [%s] in [%d] ms", 
dutyGroupName, runMillis);
+        log.info("Finished coordinator run for group [%s] in [%d] ms.%n", 
dutyGroupName, runMillis);
       }
       catch (Exception e) {
         log.makeAlert(e, "Caught exception, ignoring so that schedule keeps 
going.").emit();
@@ -818,15 +829,8 @@ public class DruidCoordinator
       final DruidCluster cluster = prepareCluster(dynamicConfig, 
segmentLoadingConfig, currentServers);
       cancelLoadsOnDecommissioningServers(cluster);
 
-      initBalancerExecutor();
-      final BalancerStrategy balancerStrategy = 
balancerStrategyFactory.createBalancerStrategy(balancerExec);
-      log.info(
-          "Using balancer strategy [%s] with round-robin assignment [%s] and 
debug dimensions [%s].",
-          balancerStrategy.getClass().getSimpleName(),
-          segmentLoadingConfig.isUseRoundRobinSegmentAssignment(),
-          dynamicConfig.getDebugDimensions()
-      );
-
+      final BalancerStrategy balancerStrategy
+          = 
createBalancerStrategy(segmentLoadingConfig.getBalancerComputeThreads());
       return params.buildFromExisting()
                    .withDruidCluster(cluster)
                    .withBalancerStrategy(balancerStrategy)
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java
index ea1f81ee6d..a9767b8f76 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java
@@ -33,18 +33,37 @@ import java.util.Map;
  */
 public class SegmentCountsPerInterval
 {
-  private final Map<String, Object2IntMap<Interval>>
-      datasourceIntervalToSegmentCount = new HashMap<>();
+  private int totalSegments;
+  private long totalSegmentBytes;
+  private final Map<String, Object2IntMap<Interval>> 
datasourceIntervalToSegmentCount = new HashMap<>();
   private final Object2IntMap<Interval> intervalToTotalSegmentCount = new 
Object2IntOpenHashMap<>();
+  private final Object2IntMap<String> datasourceToTotalSegmentCount = new 
Object2IntOpenHashMap<>();
 
   public void addSegment(DataSegment segment)
   {
     updateCountInInterval(segment, 1);
+    totalSegmentBytes += segment.getSize();
   }
 
   public void removeSegment(DataSegment segment)
   {
     updateCountInInterval(segment, -1);
+    totalSegmentBytes -= segment.getSize();
+  }
+
+  public int getTotalSegmentCount()
+  {
+    return totalSegments;
+  }
+
+  public long getTotalSegmentBytes()
+  {
+    return totalSegmentBytes;
+  }
+
+  public Object2IntMap<String> getDatasourceToTotalSegmentCount()
+  {
+    return datasourceToTotalSegmentCount;
   }
 
   public Object2IntMap<Interval> getIntervalToSegmentCount(String datasource)
@@ -59,7 +78,9 @@ public class SegmentCountsPerInterval
 
   private void updateCountInInterval(DataSegment segment, int delta)
   {
+    totalSegments += delta;
     intervalToTotalSegmentCount.mergeInt(segment.getInterval(), delta, 
Integer::sum);
+    datasourceToTotalSegmentCount.mergeInt(segment.getDataSource(), delta, 
Integer::sum);
     datasourceIntervalToSegmentCount
         .computeIfAbsent(segment.getDataSource(), ds -> new 
Object2IntOpenHashMap<>())
         .mergeInt(segment.getInterval(), delta, Integer::sum);
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java 
b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
index d55ac035d6..5de1bd5ee0 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
@@ -181,6 +181,23 @@ public class ServerHolder implements 
Comparable<ServerHolder>
     return server.getMaxSize();
   }
 
+  /**
+   * Total projected disk usage of this server in bytes.
+   * <p>
+   * The total size:
+   * <ol>
+   * <li>INCLUDES segments loaded on this server</li>
+   * <li>INCLUDES segments loading on this server (actions: 
LOAD/REPLICATE)</li>
+   * <li>INCLUDES segments moving to this server (action: MOVE_TO)</li>
+   * <li>INCLUDES segments moving from this server (action: MOVE_FROM). This is
+   * because these segments have only been <i>marked</i> for drop. We include
+   * the size of these segments to avoid over-assigning the server in case the
+   * corresponding MOVE_TO operation gets delayed or fails.</li>
+   * <li>EXCLUDES segments dropping from this server (action: DROP). Excluding
+   * these segments cannot result in over-assignment because drops are always
+   * processed before loads.</li>
+   * </ol>
+   */
   public long getSizeUsed()
   {
     return server.getCurrSize() + sizeOfLoadingSegments - 
sizeOfDroppingSegments;
@@ -317,6 +334,11 @@ public class ServerHolder implements 
Comparable<ServerHolder>
     return loadingReplicaCount;
   }
 
+  public int getNumQueuedSegments()
+  {
+    return queuedSegments.size();
+  }
+
   public boolean startOperation(SegmentAction action, DataSegment segment)
   {
     if (queuedSegments.containsKey(segment)) {
@@ -349,7 +371,7 @@ public class ServerHolder implements 
Comparable<ServerHolder>
     }
   }
 
-  public boolean hasSegmentLoaded(SegmentId segmentId)
+  private boolean hasSegmentLoaded(SegmentId segmentId)
   {
     return server.getSegment(segmentId) != null;
   }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java
index e133430b1b..9def032834 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java
@@ -72,8 +72,7 @@ public interface BalancerStrategy
   Iterator<ServerHolder> findServersToDropSegment(DataSegment segmentToDrop, 
List<ServerHolder> serverHolders);
 
   /**
-   * Returns the stats collected by the strategy in the current run and resets
-   * the stats collector for the next run.
+   * Returns the stats collected by the strategy.
    */
-  CoordinatorRunStats getAndResetStats();
+  CoordinatorRunStats getStats();
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
index aae907bb59..96a6ccccf5 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
@@ -46,6 +46,7 @@ import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 public class CostBalancerStrategy implements BalancerStrategy
@@ -68,6 +69,7 @@ public class CostBalancerStrategy implements BalancerStrategy
       .thenComparing(pair -> pair.rhs);
 
   private final CoordinatorRunStats stats = new CoordinatorRunStats();
+  private final AtomicLong computeTimeNanos = new AtomicLong(0);
 
   public static double computeJointSegmentsCost(DataSegment segment, 
Iterable<DataSegment> segmentSet)
   {
@@ -263,9 +265,13 @@ public class CostBalancerStrategy implements 
BalancerStrategy
   }
 
   @Override
-  public CoordinatorRunStats getAndResetStats()
+  public CoordinatorRunStats getStats()
   {
-    return stats.getSnapshotAndReset();
+    stats.add(
+        Stats.Balancer.COMPUTATION_TIME,
+        TimeUnit.NANOSECONDS.toMillis(computeTimeNanos.getAndSet(0))
+    );
+    return stats;
   }
 
   /**
@@ -351,8 +357,8 @@ public class CostBalancerStrategy implements 
BalancerStrategy
 
     // Report computation stats
     computeTime.stop();
-    stats.add(Stats.Balancer.COMPUTATION_COUNT, metricKey, 1);
-    stats.add(Stats.Balancer.COMPUTATION_TIME, metricKey, 
computeTime.elapsed(TimeUnit.MILLISECONDS));
+    stats.add(Stats.Balancer.COMPUTATION_COUNT, 1);
+    computeTimeNanos.addAndGet(computeTime.elapsed(TimeUnit.NANOSECONDS));
 
     return costPrioritizedServers.stream().map(pair -> pair.rhs)
                                  .collect(Collectors.toList());
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
index fc0da34310..b8e0000567 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
@@ -73,7 +73,7 @@ public class RandomBalancerStrategy implements 
BalancerStrategy
   }
 
   @Override
-  public CoordinatorRunStats getAndResetStats()
+  public CoordinatorRunStats getStats()
   {
     return CoordinatorRunStats.empty();
   }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java
new file mode 100644
index 0000000000..d8846740a7
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.balancer;
+
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.SegmentCountsPerInterval;
+import org.apache.druid.server.coordinator.ServerHolder;
+import org.joda.time.Duration;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Calculates the maximum, minimum and required number of segments to move in a
+ * Coordinator run for balancing.
+ */
+public class SegmentToMoveCalculator
+{
+  /**
+   * At least this number of segments must be picked for moving in every cycle
+   * to keep the cluster well balanced.
+   */
+  private static final int MIN_SEGMENTS_TO_MOVE = 100;
+
+  private static final Logger log = new Logger(SegmentToMoveCalculator.class);
+
+  /**
+   * Calculates the number of segments to be picked for moving in the given 
tier,
+   * based on the level of skew between the historicals in the tier.
+   *
+   * @param tier                    Name of tier used for logging purposes
+   * @param historicals             Active historicals in tier
+   * @param maxSegmentsToMoveInTier Maximum number of segments allowed to be 
moved
+   *                                in the tier.
+   * @return Number of segments to move in the tier in the range
+   * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code maxSegmentsToMoveInTier}].
+   */
+  public static int computeNumSegmentsToMoveInTier(
+      String tier,
+      List<ServerHolder> historicals,
+      int maxSegmentsToMoveInTier
+  )
+  {
+    final int totalSegments = historicals.stream().mapToInt(
+        server -> server.getProjectedSegments().getTotalSegmentCount()
+    ).sum();
+
+    // Move at least some segments to ensure that the cluster is always 
balancing itself
+    final int minSegmentsToMove = SegmentToMoveCalculator
+        .computeMinSegmentsToMoveInTier(totalSegments);
+    final int segmentsToMoveToFixDeviation = SegmentToMoveCalculator
+        .computeNumSegmentsToMoveToBalanceTier(tier, historicals);
+    log.info(
+        "Need to move [%,d] segments in tier[%s] to attain balance. Allowed 
values are [min=%d, max=%d].",
+        segmentsToMoveToFixDeviation, tier, minSegmentsToMove, 
maxSegmentsToMoveInTier
+    );
+
+    final int activeSegmentsToMove = Math.max(minSegmentsToMove, 
segmentsToMoveToFixDeviation);
+    return Math.min(activeSegmentsToMove, maxSegmentsToMoveInTier);
+  }
+
+  /**
+   * Calculates the minimum number of segments that should be considered for
+   * moving in a tier, so that the cluster is always balancing itself.
+   * <p>
+   * This value must be calculated separately for every tier.
+   *
+   * @param totalSegmentsInTier Total number of all replicas of all segments
+   *                            loaded or queued across all historicals in the 
tier.
+   * @return {@code minSegmentsToMoveInTier} in the range
+   * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code ~0.15% of totalSegmentsInTier}].
+   */
+  public static int computeMinSegmentsToMoveInTier(int totalSegmentsInTier)
+  {
+    // Divide by 2^14 and multiply by 100 so that the value increases
+    // in steps of 100 for every 2^16 = ~65k segments
+    int upperBound = (totalSegmentsInTier >> 16) * 100;
+    int lowerBound = Math.min(MIN_SEGMENTS_TO_MOVE, totalSegmentsInTier);
+    return Math.max(lowerBound, upperBound);
+  }
+
+  /**
+   * Calculates the maximum number of segments that can be picked for moving in
+   * the cluster in a single coordinator run.
+   * <p>
+   * This value must be calculated at the cluster level and then applied
+   * to every tier so that the total computation time is estimated correctly.
+   * <p>
+   * Each balancer thread can perform 1 billion computations in 20s (see 
#14584).
+   * Therefore, keeping a buffer of 10s, in every 30s:
+   * <pre>
+   * numComputations = maxSegmentsToMove * totalSegments
+   *
+   * maxSegmentsToMove = numComputations / totalSegments
+   *                   = (nThreads * 1B) / totalSegments
+   * </pre>
+   *
+   * @param totalSegments Total number of all replicas of all segments loaded 
or
+   *                      queued across all historicals in the cluster.
+   * @return {@code maxSegmentsToMove} per tier in the range
+   * [{@link #MIN_SEGMENTS_TO_MOVE}, ~20% of totalSegments].
+   * @see <a href="https://github.com/apache/druid/pull/14584";>#14584</a>
+   */
+  public static int computeMaxSegmentsToMovePerTier(
+      int totalSegments,
+      int numBalancerThreads,
+      Duration coordinatorPeriod
+  )
+  {
+    Preconditions.checkArgument(
+        numBalancerThreads > 0 && numBalancerThreads <= 100,
+        "Number of balancer threads must be in range (0, 100]."
+    );
+    if (totalSegments <= 0) {
+      return 0;
+    }
+
+    // Divide by 2^9 and multiply by 100 so that the upperBound
+    // increases in steps of 100 for every 2^9 = 512 segments (~20%)
+    final int upperBound = (totalSegments >> 9) * 100;
+    final int lowerBound = MIN_SEGMENTS_TO_MOVE;
+
+    int num30sPeriods = Math.min(4, (int) (coordinatorPeriod.getMillis() / 
30_000));
+
+    // Each thread can do ~1B computations in 30s = 1M * 1k = 2^20 * 1k
+    int maxComputationsInThousands = (numBalancerThreads * num30sPeriods) << 
20;
+    int maxSegmentsToMove = (maxComputationsInThousands / totalSegments) * 
1000;
+
+    if (upperBound < lowerBound) {
+      return Math.min(lowerBound, totalSegments);
+    } else {
+      return Math.min(maxSegmentsToMove, upperBound);
+    }
+  }
+
+  /**
+   * Computes the number of segments that need to be moved across the 
historicals
+   * in a tier to attain balance in terms of disk usage and segment counts per
+   * data source.
+   *
+   * @param tier        Name of the tier used only for logging purposes
+   * @param historicals List of historicals in the tier
+   */
+  public static int computeNumSegmentsToMoveToBalanceTier(String tier, 
List<ServerHolder> historicals)
+  {
+    if (historicals.isEmpty()) {
+      return 0;
+    }
+
+    return Math.max(
+        computeSegmentsToMoveToBalanceCountsPerDatasource(tier, historicals),
+        computeSegmentsToMoveToBalanceDiskUsage(tier, historicals)
+    );
+  }
+
+  private static double getAverageSegmentSize(List<ServerHolder> servers)
+  {
+    int totalSegmentCount = 0;
+    long totalUsageBytes = 0;
+    for (ServerHolder server : servers) {
+      totalSegmentCount += 
server.getProjectedSegments().getTotalSegmentCount();
+      totalUsageBytes += server.getProjectedSegments().getTotalSegmentBytes();
+    }
+
+    if (totalSegmentCount <= 0 || totalUsageBytes <= 0) {
+      return 0;
+    } else {
+      return (1.0 * totalUsageBytes) / totalSegmentCount;
+    }
+  }
+
+  /**
+   * Computes the number of segments to move across the servers of the tier in
+   * order to balance the segment counts of the most unbalanced datasource.
+   */
+  static int computeSegmentsToMoveToBalanceCountsPerDatasource(
+      String tier,
+      List<ServerHolder> servers
+  )
+  {
+    // Find all the datasources
+    final Set<String> datasources = servers.stream().flatMap(
+        s -> 
s.getProjectedSegments().getDatasourceToTotalSegmentCount().keySet().stream()
+    ).collect(Collectors.toSet());
+    if (datasources.isEmpty()) {
+      return 0;
+    }
+
+    // Compute the min and max number of segments for each datasource
+    final Object2IntMap<String> datasourceToMaxSegments = new 
Object2IntOpenHashMap<>();
+    final Object2IntMap<String> datasourceToMinSegments = new 
Object2IntOpenHashMap<>();
+    for (ServerHolder server : servers) {
+      final Object2IntMap<String> datasourceToSegmentCount
+          = server.getProjectedSegments().getDatasourceToTotalSegmentCount();
+      for (String datasource : datasources) {
+        int count = datasourceToSegmentCount.getInt(datasource);
+        datasourceToMaxSegments.mergeInt(datasource, count, Math::max);
+        datasourceToMinSegments.mergeInt(datasource, count, Math::min);
+      }
+    }
+
+    // Compute the gap between min and max for each datasource and order by 
largest first
+    final TreeMap<Integer, String> countDiffToDatasource = new 
TreeMap<>(Comparator.reverseOrder());
+    datasourceToMaxSegments.object2IntEntrySet().forEach(entry -> {
+      String datasource = entry.getKey();
+      int maxCount = entry.getIntValue();
+      int minCount = datasourceToMinSegments.getInt(datasource);
+      countDiffToDatasource.put(maxCount - minCount, datasource);
+    });
+
+    // Identify the most unbalanced datasource
+    final Map.Entry<Integer, String> maxCountDifference = 
countDiffToDatasource.firstEntry();
+    String mostUnbalancedDatasource = maxCountDifference.getValue();
+    int minNumSegments = Integer.MAX_VALUE;
+    int maxNumSegments = 0;
+    for (ServerHolder server : servers) {
+      int countForSkewedDatasource = server.getProjectedSegments()
+                                           .getDatasourceToTotalSegmentCount()
+                                           .getInt(mostUnbalancedDatasource);
+
+      minNumSegments = Math.min(minNumSegments, countForSkewedDatasource);
+      maxNumSegments = Math.max(maxNumSegments, countForSkewedDatasource);
+    }
+
+    final int numSegmentsToMove = maxCountDifference.getKey() / 2;
+    if (numSegmentsToMove > 0) {
+      log.info(
+          "Need to move [%,d] segments of datasource[%s] in tier[%s] to fix 
gap between min[%,d] and max[%,d].",
+          numSegmentsToMove, mostUnbalancedDatasource, tier, minNumSegments, 
maxNumSegments
+      );
+    }
+    return numSegmentsToMove;
+  }
+
+  private static int computeSegmentsToMoveToBalanceDiskUsage(
+      String tier,
+      List<ServerHolder> servers
+  )
+  {
+    if (servers.isEmpty()) {
+      return 0;
+    }
+
+    double maxUsagePercent = 0.0;
+    double minUsagePercent = 100.0;
+
+    long maxUsageBytes = 0;
+    long minUsageBytes = Long.MAX_VALUE;
+    for (ServerHolder server : servers) {
+      final SegmentCountsPerInterval projectedSegments = 
server.getProjectedSegments();
+
+      // Track the maximum and minimum values
+      long serverUsageBytes = projectedSegments.getTotalSegmentBytes();
+      maxUsageBytes = Math.max(serverUsageBytes, maxUsageBytes);
+      minUsageBytes = Math.min(serverUsageBytes, minUsageBytes);
+
+      double diskUsage = server.getMaxSize() <= 0
+                         ? 0 : (100.0 * 
projectedSegments.getTotalSegmentBytes()) / server.getMaxSize();
+      maxUsagePercent = Math.max(diskUsage, maxUsagePercent);
+      minUsagePercent = Math.min(diskUsage, minUsagePercent);
+    }
+
+    final double averageSegmentSize = getAverageSegmentSize(servers);
+    final long differenceInUsageBytes = maxUsageBytes - minUsageBytes;
+    final int numSegmentsToMove = averageSegmentSize <= 0
+                                  ? 0 : (int) (differenceInUsageBytes / 
averageSegmentSize) / 2;
+
+    if (numSegmentsToMove > 0) {
+      log.info(
+          "Need to move [%,d] segments of avg size [%,d MB] in tier[%s] to fix"
+          + " disk usage gap between min[%d GB][%.1f%%] and max[%d 
GB][%.1f%%].",
+          numSegmentsToMove, ((long) averageSegmentSize) >> 20, tier,
+          minUsageBytes >> 30, minUsagePercent, maxUsageBytes >> 30, 
maxUsagePercent
+      );
+    }
+    return numSegmentsToMove;
+  }
+
+  private SegmentToMoveCalculator()
+  {
+    // no instantiation
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
index 86e5b99629..1523576b11 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
@@ -21,9 +21,9 @@ package org.apache.druid.server.coordinator.balancer;
 
 import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.ServerHolder;
-import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
 import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.coordinator.stats.Dimension;
@@ -55,27 +55,23 @@ public class TierSegmentBalancer
   private final DruidCoordinatorRuntimeParams params;
   private final StrategicSegmentAssigner segmentAssigner;
 
-  private final SegmentLoadingConfig loadingConfig;
   private final CoordinatorRunStats runStats;
 
   private final List<ServerHolder> activeServers;
   private final List<ServerHolder> decommissioningServers;
-  private final int totalMaxSegmentsToMove;
-
+  private final int maxSegmentsToMove;
   private final int movingSegmentCount;
 
   public TierSegmentBalancer(
       String tier,
       Set<ServerHolder> servers,
+      int maxSegmentsToMove,
       DruidCoordinatorRuntimeParams params
   )
   {
     this.tier = tier;
     this.params = params;
     this.segmentAssigner = params.getSegmentAssigner();
-
-    this.loadingConfig = params.getSegmentLoadingConfig();
-    this.totalMaxSegmentsToMove = loadingConfig.getMaxSegmentsToMove();
     this.runStats = segmentAssigner.getStats();
 
     Map<Boolean, List<ServerHolder>> partitions =
@@ -84,55 +80,29 @@ public class TierSegmentBalancer
     this.activeServers = partitions.get(false);
 
     this.movingSegmentCount = 
activeServers.stream().mapToInt(ServerHolder::getNumMovingSegments).sum();
+    this.maxSegmentsToMove = maxSegmentsToMove;
   }
 
   public void run()
   {
-    if (activeServers.isEmpty() || (activeServers.size() <= 1 && 
decommissioningServers.isEmpty())) {
-      log.warn(
-          "Skipping balance for tier [%s] with [%d] active servers and [%d] 
decomissioning servers.",
-          tier, activeServers.size(), decommissioningServers.size()
-      );
-      return;
-    }
-
-    log.info(
-        "Moving max [%d] segments in tier [%s] with [%d] active servers and"
-        + " [%d] decommissioning servers. There are [%d] segments already in 
queue.",
-        totalMaxSegmentsToMove, tier, activeServers.size(), 
decommissioningServers.size(), movingSegmentCount
-    );
-
-    // Move segments from decommissioning to active servers
-    int movedDecommSegments = 0;
-    if (!decommissioningServers.isEmpty()) {
-      int maxDecommPercentToMove = 
loadingConfig.getPercentDecommSegmentsToMove();
-      int maxDecommSegmentsToMove = (int) Math.ceil(totalMaxSegmentsToMove * 
(maxDecommPercentToMove / 100.0));
-      movedDecommSegments +=
-          moveSegmentsFromTo(decommissioningServers, activeServers, 
maxDecommSegmentsToMove);
-      log.info(
-          "Moved [%d] segments out of max [%d (%d%%)] from decommissioning to 
active servers in tier [%s].",
-          movedDecommSegments, maxDecommSegmentsToMove, 
maxDecommPercentToMove, tier
-      );
-    }
+    int numDecommSegmentsToMove = 
getNumDecommSegmentsToMove(maxSegmentsToMove);
+    moveSegmentsFrom(decommissioningServers, numDecommSegmentsToMove, 
"decommissioning");
 
-    // Move segments across active servers
-    int maxGeneralSegmentsToMove = totalMaxSegmentsToMove - 
movedDecommSegments;
-    int movedGeneralSegments =
-        moveSegmentsFromTo(activeServers, activeServers, 
maxGeneralSegmentsToMove);
-    log.info(
-        "Moved [%d] segments out of max [%d] between active servers in tier 
[%s].",
-        movedGeneralSegments, maxGeneralSegmentsToMove, tier
-    );
+    int numActiveSegmentsToMove = getNumActiveSegmentsToMove(maxSegmentsToMove 
- numDecommSegmentsToMove);
+    moveSegmentsFrom(activeServers, numActiveSegmentsToMove, "active");
   }
 
-  private int moveSegmentsFromTo(
-      List<ServerHolder> sourceServers,
-      List<ServerHolder> destServers,
-      int maxSegmentsToMove
+  /**
+   * Moves segments from the given source servers to the active servers in 
this tier.
+   */
+  private void moveSegmentsFrom(
+      final List<ServerHolder> sourceServers,
+      final int numSegmentsToMove,
+      final String sourceServerType
   )
   {
-    if (maxSegmentsToMove <= 0 || sourceServers.isEmpty() || 
destServers.isEmpty()) {
-      return 0;
+    if (numSegmentsToMove <= 0 || sourceServers.isEmpty() || 
activeServers.isEmpty()) {
+      return;
     }
 
     final Set<String> broadcastDatasources = params.getBroadcastDatasources();
@@ -140,25 +110,30 @@ public class TierSegmentBalancer
     // Always move loading segments first as it is a cheaper operation
     List<BalancerSegmentHolder> pickedSegments = 
ReservoirSegmentSampler.pickMovableSegmentsFrom(
         sourceServers,
-        maxSegmentsToMove,
+        numSegmentsToMove,
         ServerHolder::getLoadingSegments,
         broadcastDatasources
     );
-    int movedCount = moveSegmentsTo(destServers, pickedSegments, 
maxSegmentsToMove);
+    int movedCount = moveSegmentsTo(activeServers, pickedSegments, 
numSegmentsToMove);
 
     // Move loaded segments only if tier is not already busy moving segments
     if (movingSegmentCount <= 0) {
-      maxSegmentsToMove -= movedCount;
+      int numLoadedSegmentsToMove = numSegmentsToMove - movedCount;
       pickedSegments = ReservoirSegmentSampler.pickMovableSegmentsFrom(
           sourceServers,
-          maxSegmentsToMove,
+          numLoadedSegmentsToMove,
           server -> server.getServer().iterateAllSegments(),
           broadcastDatasources
       );
-      movedCount += moveSegmentsTo(destServers, pickedSegments, 
maxSegmentsToMove);
+      movedCount += moveSegmentsTo(activeServers, pickedSegments, 
numLoadedSegmentsToMove);
+    } else {
+      log.info("There are already [%,d] segments moving in tier[%s].", 
movingSegmentCount, tier);
     }
 
-    return movedCount;
+    log.info(
+        "Moved [%,d of %,d] segments from [%d] [%s] servers in tier [%s].",
+        movedCount, numSegmentsToMove, sourceServers.size(), sourceServerType, 
tier
+    );
   }
 
   private int moveSegmentsTo(
@@ -221,4 +196,39 @@ public class TierSegmentBalancer
     runStats.add(Stats.Segments.MOVE_SKIPPED, key, 1);
   }
 
+  /**
+   * Number of segments to move away from the decommissioning historicals of 
this tier.
+   */
+  private int getNumDecommSegmentsToMove(int maxSegmentsToMove)
+  {
+    final CoordinatorDynamicConfig dynamicConfig = 
params.getCoordinatorDynamicConfig();
+    if (decommissioningServers.isEmpty() || activeServers.isEmpty()) {
+      return 0;
+    } else if (dynamicConfig.isSmartSegmentLoading()) {
+      final int decommSegmentsToMove = 
decommissioningServers.stream().mapToInt(
+          server -> server.getProjectedSegments().getTotalSegmentCount()
+      ).sum();
+      return Math.min(decommSegmentsToMove, maxSegmentsToMove);
+    } else {
+      int maxPercentageToMove = 
dynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove();
+      return (int) Math.ceil(maxSegmentsToMove * (maxPercentageToMove / 
100.0));
+    }
+  }
+
+  /**
+   * Number of segments to move between the active historicals of this tier.
+   */
+  private int getNumActiveSegmentsToMove(int maxActiveSegmentsToMove)
+  {
+    final CoordinatorDynamicConfig dynamicConfig = 
params.getCoordinatorDynamicConfig();
+    if (activeServers.size() < 2) {
+      return 0;
+    } else if (dynamicConfig.isSmartSegmentLoading()) {
+      return SegmentToMoveCalculator.computeNumSegmentsToMoveInTier(tier, 
activeServers, maxActiveSegmentsToMove);
+    } else {
+      // If smartSegmentLoading is disabled, just use the configured value
+      return maxActiveSegmentsToMove;
+    }
+  }
+
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
index 4ff2e65743..37e58ff28f 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
@@ -19,12 +19,19 @@
 
 package org.apache.druid.server.coordinator.duty;
 
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
 import org.apache.druid.server.coordinator.DruidCluster;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.balancer.SegmentToMoveCalculator;
 import org.apache.druid.server.coordinator.balancer.TierSegmentBalancer;
 import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.joda.time.Duration;
+
+import java.util.Set;
 
 /**
  *
@@ -33,6 +40,13 @@ public class BalanceSegments implements CoordinatorDuty
 {
   private static final EmittingLogger log = new 
EmittingLogger(BalanceSegments.class);
 
+  private final Duration coordinatorPeriod;
+
+  public BalanceSegments(Duration coordinatorPeriod)
+  {
+    this.coordinatorPeriod = coordinatorPeriod;
+  }
+
   @Override
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
   {
@@ -43,27 +57,79 @@ public class BalanceSegments implements CoordinatorDuty
 
     final DruidCluster cluster = params.getDruidCluster();
     final SegmentLoadingConfig loadingConfig = 
params.getSegmentLoadingConfig();
-    final int maxSegmentsToMove = loadingConfig.getMaxSegmentsToMove();
+
+    final int maxSegmentsToMove = getMaxSegmentsToMove(params);
     if (maxSegmentsToMove <= 0) {
       log.info("Skipping balance as maxSegmentsToMove is [%d].", 
maxSegmentsToMove);
       return params;
     } else {
       log.info(
-          "Balancing segments in tiers [%s] with maxSegmentsToMove=[%d], 
maxLifetime=[%d].",
+          "Balancing segments in tiers [%s] with maxSegmentsToMove[%,d] and 
maxLifetime[%d].",
           cluster.getTierNames(), maxSegmentsToMove, 
loadingConfig.getMaxLifetimeInLoadQueue()
       );
     }
 
     cluster.getHistoricals().forEach(
-        (tier, servers) -> new TierSegmentBalancer(tier, servers, params).run()
+        (tier, servers) -> new TierSegmentBalancer(tier, servers, 
maxSegmentsToMove, params).run()
     );
 
     CoordinatorRunStats runStats = params.getCoordinatorStats();
     params.getBalancerStrategy()
-          .getAndResetStats()
+          .getStats()
           .forEachStat(runStats::add);
 
     return params;
   }
 
+  /**
+   * Recomputes the value of {@code maxSegmentsToMove} if smart segment loading
+   * is enabled. {@code maxSegmentsToMove} defines only the upper bound, the 
actual
+   * number of segments picked for moving is determined by the {@link 
TierSegmentBalancer}
+   * based on the level of skew in the tier.
+   */
+  private int getMaxSegmentsToMove(DruidCoordinatorRuntimeParams params)
+  {
+    final CoordinatorDynamicConfig dynamicConfig = 
params.getCoordinatorDynamicConfig();
+    if (dynamicConfig.isSmartSegmentLoading()) {
+      final Pair<Integer, Integer> numHistoricalsAndSegments = 
getNumHistoricalsAndSegments(params.getDruidCluster());
+      final int totalSegmentsInCluster = numHistoricalsAndSegments.rhs;
+
+      final int numBalancerThreads = 
params.getSegmentLoadingConfig().getBalancerComputeThreads();
+      final int maxSegmentsToMove = SegmentToMoveCalculator
+          .computeMaxSegmentsToMovePerTier(totalSegmentsInCluster, 
numBalancerThreads, coordinatorPeriod);
+      log.info(
+          "Computed maxSegmentsToMove[%,d] for total [%,d] segments on [%d] 
historicals.",
+          maxSegmentsToMove, totalSegmentsInCluster, 
numHistoricalsAndSegments.lhs
+      );
+
+      return maxSegmentsToMove;
+    } else {
+      return dynamicConfig.getMaxSegmentsToMove();
+    }
+  }
+
+  /**
+   * Calculates the total number of historicals (active and decommissioning) 
and
+   * the total number of segments on these historicals that would participate 
in
+   * cost computations. This includes all replicas of all loaded, loading, 
dropping
+   * and moving segments.
+   * <p>
+   * This is calculated here to ensure that all assignments done by the 
preceding
+   * {@link RunRules} duty are accounted for.
+   */
+  private Pair<Integer, Integer> getNumHistoricalsAndSegments(DruidCluster 
cluster)
+  {
+    int numHistoricals = 0;
+    int numSegments = 0;
+
+    for (Set<ServerHolder> historicals : cluster.getHistoricals().values()) {
+      for (ServerHolder historical : historicals) {
+        ++numHistoricals;
+        numSegments += historical.getServer().getNumSegments() + 
historical.getNumQueuedSegments();
+      }
+    }
+
+    return Pair.of(numHistoricals, numSegments);
+  }
+
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
index 1c0ff573d1..8c5acaeebb 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
@@ -144,8 +144,8 @@ public class CollectSegmentAndServerStats implements 
CoordinatorDuty
 
     final int numHistoricals = historicals.size();
     log.info(
-        "Tier [%s] is serving [%,d], loading [%,d] and dropping [%,d] segments"
-        + " across [%d] historicals with average usage [%d GBs], [%.1f%%].",
+        "Tier[%s] is serving [%,d], loading [%,d] and dropping [%,d] segments"
+        + " across [%d] historicals with average usage[%d GBs], [%.1f%%].",
         tier, servedCount.get(), loadingCount.get(), droppingCount.get(), 
numHistoricals,
         (currentBytesSum.get() >> 30) / numHistoricals, usageSum.get() / 
numHistoricals
     );
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
index 8bdcee3641..5a135d81cf 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
@@ -97,7 +97,7 @@ public class UnloadUnusedSegments implements CoordinatorDuty
             && loadQueueManager.dropSegment(segment, serverHolder)) {
           totalUnneededCount++;
           log.debug(
-              "Dropping uneeded segment[%s] from server[%s] in tier[%s]",
+              "Dropping uneeded segment[%s] from server[%s] in tier[%s].",
               segment.getId(), server.getName(), server.getTier()
           );
         }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
index bb42e95eaa..f08b7ed5ca 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
@@ -35,33 +35,27 @@ public class SegmentLoadingConfig
   private final int maxReplicaAssignmentsInRun;
   private final int maxLifetimeInLoadQueue;
 
-  private final int maxSegmentsToMove;
-  private final int percentDecommSegmentsToMove;
+  private final int balancerComputeThreads;
 
   private final boolean useRoundRobinSegmentAssignment;
 
   /**
-   * Creates a new SegmentLoadingConfig with recomputed coordinator config 
values from
+   * Creates a new SegmentLoadingConfig with recomputed coordinator config 
values
    * based on whether {@link CoordinatorDynamicConfig#isSmartSegmentLoading()}
    * is enabled or not.
    */
   public static SegmentLoadingConfig create(CoordinatorDynamicConfig 
dynamicConfig, int numUsedSegments)
   {
     if (dynamicConfig.isSmartSegmentLoading()) {
-      // Compute recommended values
-      // Impose a lower bound on both replicationThrottleLimit and 
maxSegmentsToMove
+      // Compute replicationThrottleLimit with a lower bound of 100
       final int throttlePercentage = 2;
       final int replicationThrottleLimit = Math.max(100, numUsedSegments * 
throttlePercentage / 100);
-
-      // Impose an upper bound on maxSegmentsToMove to ensure that coordinator
-      // run times are bounded. This limit can be relaxed as performance of
-      // the CostBalancerStrategy.computeCost() is improved.
-      final int maxSegmentsToMove = Math.min(1000, replicationThrottleLimit);
+      final int balancerComputeThreads = 
computeNumBalancerThreads(numUsedSegments);
 
       log.info(
-          "Smart segment loading is enabled. Recomputed 
replicationThrottleLimit"
-          + " [%,d] (%d%% of used segments [%,d]) and maxSegmentsToMove 
[%,d].",
-          replicationThrottleLimit, throttlePercentage, numUsedSegments, 
maxSegmentsToMove
+          "Smart segment loading is enabled. Calculated 
balancerComputeThreads[%d]"
+          + " and replicationThrottleLimit[%,d] (%d%% of used segments[%,d]).",
+          balancerComputeThreads, replicationThrottleLimit, 
throttlePercentage, numUsedSegments
       );
 
       return new SegmentLoadingConfig(
@@ -69,9 +63,8 @@ public class SegmentLoadingConfig
           replicationThrottleLimit,
           Integer.MAX_VALUE,
           60,
-          maxSegmentsToMove,
-          100,
-          true
+          true,
+          balancerComputeThreads
       );
     } else {
       // Use the configured values
@@ -80,9 +73,8 @@ public class SegmentLoadingConfig
           dynamicConfig.getReplicationThrottleLimit(),
           dynamicConfig.getMaxNonPrimaryReplicantsToLoad(),
           dynamicConfig.getReplicantLifetime(),
-          dynamicConfig.getMaxSegmentsToMove(),
-          dynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove(),
-          dynamicConfig.isUseRoundRobinSegmentAssignment()
+          dynamicConfig.isUseRoundRobinSegmentAssignment(),
+          dynamicConfig.getBalancerComputeThreads()
       );
     }
   }
@@ -92,18 +84,16 @@ public class SegmentLoadingConfig
       int replicationThrottleLimit,
       int maxReplicaAssignmentsInRun,
       int maxLifetimeInLoadQueue,
-      int maxSegmentsToMove,
-      int percentDecommSegmentsToMove,
-      boolean useRoundRobinSegmentAssignment
+      boolean useRoundRobinSegmentAssignment,
+      int balancerComputeThreads
   )
   {
     this.maxSegmentsInLoadQueue = maxSegmentsInLoadQueue;
     this.replicationThrottleLimit = replicationThrottleLimit;
     this.maxReplicaAssignmentsInRun = maxReplicaAssignmentsInRun;
     this.maxLifetimeInLoadQueue = maxLifetimeInLoadQueue;
-    this.maxSegmentsToMove = maxSegmentsToMove;
-    this.percentDecommSegmentsToMove = percentDecommSegmentsToMove;
     this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
+    this.balancerComputeThreads = balancerComputeThreads;
   }
 
   public int getMaxSegmentsInLoadQueue()
@@ -111,11 +101,6 @@ public class SegmentLoadingConfig
     return maxSegmentsInLoadQueue;
   }
 
-  public int getMaxSegmentsToMove()
-  {
-    return maxSegmentsToMove;
-  }
-
   public int getReplicationThrottleLimit()
   {
     return replicationThrottleLimit;
@@ -136,8 +121,35 @@ public class SegmentLoadingConfig
     return maxReplicaAssignmentsInRun;
   }
 
-  public int getPercentDecommSegmentsToMove()
+  public int getBalancerComputeThreads()
   {
-    return percentDecommSegmentsToMove;
+    return balancerComputeThreads;
+  }
+
+  /**
+   * Computes the number of threads to be used in the balancing executor.
+   * The number of used segments in a cluster is generally a good indicator of
+   * the cluster size and has been used here as a proxy for the actual number 
of
+   * segments that would be involved in cost computations.
+   * <p>
+   * The number of threads increases by 1 first for every 50k segments, then 
for
+   * every 75k segments and so on.
+   *
+   * @return Number of {@code balancerComputeThreads} in the range [1, 8].
+   */
+  public static int computeNumBalancerThreads(int numUsedSegments)
+  {
+    // Add an extra thread when numUsedSegments increases by a step
+    final int[] stepValues = {50, 50, 75, 75, 100, 100, 150, 150};
+
+    int remainder = numUsedSegments / 1000;
+    for (int step = 0; step < stepValues.length; ++step) {
+      remainder -= stepValues[step];
+      if (remainder < 0) {
+        return step + 1;
+      }
+    }
+
+    return stepValues.length;
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
index 6d0e1b8ae8..cbee974452 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
@@ -255,6 +255,8 @@ public class CoordinatorRunStats
   {
     if (debugDimensions.isEmpty()) {
       return false;
+    } else if (rowKey.getValues().isEmpty()) {
+      return true;
     }
 
     for (Map.Entry<Dimension, String> entry : rowKey.getValues().entrySet()) {
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
index d1df912698..63a0944ed7 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
@@ -38,6 +38,7 @@ import org.apache.druid.server.coordinator.rules.Rule;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.easymock.EasyMock;
+import org.joda.time.Duration;
 import org.joda.time.Interval;
 import org.joda.time.Period;
 import org.junit.Before;
@@ -142,7 +143,7 @@ public class BalanceSegmentsProfiler
         .withDatabaseRuleManager(manager)
         .build();
 
-    BalanceSegments tester = new BalanceSegments();
+    BalanceSegments tester = new BalanceSegments(Duration.standardMinutes(1));
     RunRules runner = new RunRules(Set::size);
     watch.start();
     DruidCoordinatorRuntimeParams balanceParams = tester.run(params);
@@ -188,7 +189,7 @@ public class BalanceSegmentsProfiler
         
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
         .withSegmentAssignerUsing(loadQueueManager)
         .build();
-    BalanceSegments tester = new BalanceSegments();
+    BalanceSegments tester = new BalanceSegments(Duration.standardMinutes(1));
     watch.start();
     DruidCoordinatorRuntimeParams balanceParams = tester.run(params);
     System.out.println(watch.stop());
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index cf53708af0..0e75f95a1f 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -51,6 +51,7 @@ import org.apache.druid.metadata.SegmentsMetadataManager;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordination.ServerType;
 import 
org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
+import 
org.apache.druid.server.coordinator.balancer.RandomBalancerStrategyFactory;
 import org.apache.druid.server.coordinator.duty.CompactSegments;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
@@ -585,25 +586,12 @@ public class DruidCoordinatorTest extends CuratorTestBase
   @Test
   public void testBalancerThreadNumber()
   {
-    CoordinatorDynamicConfig dynamicConfig = 
EasyMock.createNiceMock(CoordinatorDynamicConfig.class);
-    
EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(5).times(2);
-    
EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(10).once();
-
-    JacksonConfigManager configManager = 
EasyMock.createNiceMock(JacksonConfigManager.class);
-    EasyMock.expect(
-        configManager.watch(
-            EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
-            EasyMock.anyObject(Class.class),
-            EasyMock.anyObject()
-        )
-    ).andReturn(new AtomicReference<>(dynamicConfig)).anyTimes();
-
     ScheduledExecutorFactory scheduledExecutorFactory = 
EasyMock.createNiceMock(ScheduledExecutorFactory.class);
-    EasyMock.replay(configManager, dynamicConfig, scheduledExecutorFactory);
+    EasyMock.replay(scheduledExecutorFactory);
 
     DruidCoordinator c = new DruidCoordinator(
         druidCoordinatorConfig,
-        configManager,
+        EasyMock.createNiceMock(JacksonConfigManager.class),
         null,
         null,
         null,
@@ -617,7 +605,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
         null,
         null,
         new CoordinatorCustomDutyGroups(ImmutableSet.of()),
-        null,
+        new RandomBalancerStrategyFactory(),
         null,
         null,
         null
@@ -628,20 +616,20 @@ public class DruidCoordinatorTest extends CuratorTestBase
     Assert.assertNull(c.getBalancerExec());
 
     // first initialization
-    c.initBalancerExecutor();
+    c.createBalancerStrategy(5);
     Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
     ListeningExecutorService firstExec = c.getBalancerExec();
     Assert.assertNotNull(firstExec);
 
     // second initialization, expect no changes as cachedBalancerThreadNumber 
is not changed
-    c.initBalancerExecutor();
+    c.createBalancerStrategy(5);
     Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
     ListeningExecutorService secondExec = c.getBalancerExec();
     Assert.assertNotNull(secondExec);
     Assert.assertSame(firstExec, secondExec);
 
     // third initialization, expect executor recreated as 
cachedBalancerThreadNumber is changed to 10
-    c.initBalancerExecutor();
+    c.createBalancerStrategy(10);
     Assert.assertEquals(10, c.getCachedBalancerThreadNumber());
     ListeningExecutorService thirdExec = c.getBalancerExec();
     Assert.assertNotNull(thirdExec);
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
index 11e36dea18..70fb2dd0b6 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
@@ -193,8 +193,6 @@ public class ServerHolderTest
     );
     Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0)));
     Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1)));
-    Assert.assertTrue(h1.hasSegmentLoaded(SEGMENTS.get(0).getId()));
-    Assert.assertFalse(h1.hasSegmentLoaded(SEGMENTS.get(1).getId()));
     Assert.assertFalse(h1.isLoadQueueFull());
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
index e4b3715960..576dfbb03d 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
@@ -31,8 +31,6 @@ import org.apache.druid.server.coordinator.CreateDataSegments;
 import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
-import org.apache.druid.server.coordinator.stats.Dimension;
-import org.apache.druid.server.coordinator.stats.RowKey;
 import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
 import org.junit.After;
@@ -317,7 +315,7 @@ public class CostBalancerStrategyTest
   }
 
   @Test
-  public void testGetAndResetStats()
+  public void testGetStats()
   {
     final ServerHolder serverA = new ServerHolder(
         createHistorical().toImmutableDruidServer(),
@@ -332,20 +330,13 @@ public class CostBalancerStrategyTest
 
     // Verify that computation stats have been tracked
     strategy.findServersToLoadSegment(segment, Arrays.asList(serverA, 
serverB));
-    CoordinatorRunStats computeStats = strategy.getAndResetStats();
+    CoordinatorRunStats computeStats = strategy.getStats();
 
-    final RowKey rowKey = RowKey.with(Dimension.DATASOURCE, DS_WIKI)
-                                .with(Dimension.DESCRIPTION, "LOAD")
-                                .and(Dimension.TIER, "hot");
-    Assert.assertEquals(1L, computeStats.get(Stats.Balancer.COMPUTATION_COUNT, 
rowKey));
+    Assert.assertEquals(1L, 
computeStats.get(Stats.Balancer.COMPUTATION_COUNT));
 
-    long computeTime = computeStats.get(Stats.Balancer.COMPUTATION_TIME, 
rowKey);
+    long computeTime = computeStats.get(Stats.Balancer.COMPUTATION_TIME);
     Assert.assertTrue(computeTime >= 0 && computeTime <= 100);
     
Assert.assertFalse(computeStats.hasStat(Stats.Balancer.COMPUTATION_ERRORS));
-
-    // Verify that stats have been reset
-    computeStats = strategy.getAndResetStats();
-    Assert.assertEquals(0, computeStats.rowCount());
   }
 
   @Test
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculatorTest.java
new file mode 100644
index 0000000000..58c1cde409
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculatorTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.balancer;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class SegmentToMoveCalculatorTest
+{
+
+  private static final Duration DEFAULT_COORDINATOR_PERIOD = 
Duration.standardMinutes(1);
+
+  /**
+   * 100 days x 100 partitions = 10,000 segments.
+   */
+  private static final List<DataSegment> WIKI_SEGMENTS
+      = CreateDataSegments.ofDatasource("wiki")
+                          .forIntervals(100, Granularities.DAY)
+                          .withNumPartitions(100)
+                          .eachOfSizeInMb(500);
+
+  /**
+   * 10 days * 1 partitions = 10 segments.
+   */
+  private static final List<DataSegment> KOALA_SEGMENTS
+      = CreateDataSegments.ofDatasource("koala")
+                          .forIntervals(10, Granularities.DAY)
+                          .eachOfSizeInMb(500);
+
+  private static final String TIER = "tier1";
+
+  @Test
+  public void testMaxSegmentsToMove1Thread()
+  {
+    Assert.assertEquals(0, computeMaxSegmentsToMove(0, 1));
+    Assert.assertEquals(50, computeMaxSegmentsToMove(50, 1));
+    Assert.assertEquals(100, computeMaxSegmentsToMove(100, 1));
+
+    Assert.assertEquals(100, computeMaxSegmentsToMove(512, 1));
+    Assert.assertEquals(200, computeMaxSegmentsToMove(1_024, 1));
+    Assert.assertEquals(300, computeMaxSegmentsToMove(1_536, 1));
+
+    Assert.assertEquals(1_900, computeMaxSegmentsToMove(10_000, 1));
+    Assert.assertEquals(9_700, computeMaxSegmentsToMove(50_000, 1));
+    Assert.assertEquals(19_500, computeMaxSegmentsToMove(100_000, 1));
+
+    Assert.assertEquals(10_000, computeMaxSegmentsToMove(200_000, 1));
+    Assert.assertEquals(4_000, computeMaxSegmentsToMove(500_000, 1));
+    Assert.assertEquals(2_000, computeMaxSegmentsToMove(1_000_000, 1));
+  }
+
+  @Test
+  public void testMaxSegmentsToMoveIncreasesWithCoordinatorPeriod()
+  {
+    Assert.assertEquals(5_000, computeMaxSegmentsToMoveInPeriod(200_000, 
Duration.millis(30_000)));
+    Assert.assertEquals(10_000, computeMaxSegmentsToMoveInPeriod(200_000, 
Duration.millis(60_000)));
+    Assert.assertEquals(15_000, computeMaxSegmentsToMoveInPeriod(200_000, 
Duration.millis(90_000)));
+    Assert.assertEquals(20_000, computeMaxSegmentsToMoveInPeriod(200_000, 
Duration.millis(120_000)));
+
+    Assert.assertEquals(2_000, computeMaxSegmentsToMoveInPeriod(500_000, 
Duration.millis(30_000)));
+    Assert.assertEquals(4_000, computeMaxSegmentsToMoveInPeriod(500_000, 
Duration.millis(60_000)));
+    Assert.assertEquals(6_000, computeMaxSegmentsToMoveInPeriod(500_000, 
Duration.millis(90_000)));
+    Assert.assertEquals(8_000, computeMaxSegmentsToMoveInPeriod(500_000, 
Duration.millis(120_000)));
+  }
+
+  @Test
+  public void testMaxSegmentsToMove8Threads()
+  {
+    Assert.assertEquals(0, computeMaxSegmentsToMove(0, 8));
+    Assert.assertEquals(50, computeMaxSegmentsToMove(50, 8));
+    Assert.assertEquals(100, computeMaxSegmentsToMove(100, 8));
+
+    Assert.assertEquals(100, computeMaxSegmentsToMove(512, 8));
+    Assert.assertEquals(200, computeMaxSegmentsToMove(1_024, 8));
+    Assert.assertEquals(300, computeMaxSegmentsToMove(1_536, 8));
+
+    Assert.assertEquals(33_000, computeMaxSegmentsToMove(500_000, 8));
+    Assert.assertEquals(16_000, computeMaxSegmentsToMove(1_000_000, 8));
+    Assert.assertEquals(8_000, computeMaxSegmentsToMove(2_000_000, 8));
+    Assert.assertEquals(3_000, computeMaxSegmentsToMove(5_000_000, 8));
+    Assert.assertEquals(1_000, computeMaxSegmentsToMove(10_000_000, 8));
+  }
+
+  @Test
+  public void testMaxSegmentsToMoveWithComputedNumThreads()
+  {
+    Assert.assertEquals(1_900, computeNumThreadsAndMaxToMove(10_000));
+    Assert.assertEquals(9_700, computeNumThreadsAndMaxToMove(50_000));
+
+    Assert.assertEquals(19_500, computeNumThreadsAndMaxToMove(100_000));
+    Assert.assertEquals(39_000, computeNumThreadsAndMaxToMove(200_000));
+    Assert.assertEquals(29_000, computeNumThreadsAndMaxToMove(500_000));
+
+    Assert.assertEquals(16_000, computeNumThreadsAndMaxToMove(1_000_000));
+    Assert.assertEquals(8_000, computeNumThreadsAndMaxToMove(2_000_000));
+    Assert.assertEquals(1_000, computeNumThreadsAndMaxToMove(10_000_000));
+  }
+
+  @Test
+  public void testMinSegmentsToMove()
+  {
+    Assert.assertEquals(0, computeMinSegmentsToMove(0));
+    Assert.assertEquals(50, computeMinSegmentsToMove(50));
+
+    Assert.assertEquals(100, computeMinSegmentsToMove(100));
+    Assert.assertEquals(100, computeMinSegmentsToMove(1_000));
+
+    Assert.assertEquals(100, computeMinSegmentsToMove(20_000));
+    Assert.assertEquals(100, computeMinSegmentsToMove(50_000));
+    Assert.assertEquals(100, computeMinSegmentsToMove(100_000));
+    Assert.assertEquals(300, computeMinSegmentsToMove(200_000));
+    Assert.assertEquals(700, computeMinSegmentsToMove(500_000));
+    Assert.assertEquals(1_500, computeMinSegmentsToMove(1_000_000));
+    Assert.assertEquals(15_200, computeMinSegmentsToMove(10_000_000));
+  }
+
+  @Test
+  public void testMinSegmentsToMoveIncreasesInSteps()
+  {
+    Assert.assertEquals(100, computeMinSegmentsToMove(131_071));
+    Assert.assertEquals(200, computeMinSegmentsToMove(131_072));
+
+    Assert.assertEquals(500, computeMinSegmentsToMove(393_215));
+    Assert.assertEquals(600, computeMinSegmentsToMove(393_216));
+
+    Assert.assertEquals(900, computeMinSegmentsToMove(655_359));
+    Assert.assertEquals(1000, computeMinSegmentsToMove(655_360));
+
+    Assert.assertEquals(9_900, computeMinSegmentsToMove(6_553_599));
+    Assert.assertEquals(10_000, computeMinSegmentsToMove(6_553_600));
+  }
+
+  @Test
+  public void testMinSegmentsArePickedForMoveWhenNoSkew()
+  {
+    final List<ServerHolder> historicals = Arrays.asList(
+        createServer("A", WIKI_SEGMENTS),
+        createServer("B", WIKI_SEGMENTS)
+    );
+
+    final int minSegmentsToMove = 
SegmentToMoveCalculator.computeMinSegmentsToMoveInTier(20_000);
+    Assert.assertEquals(100, minSegmentsToMove);
+
+    final int segmentsToMoveToFixSkew = SegmentToMoveCalculator
+        .computeNumSegmentsToMoveToBalanceTier(TIER, historicals);
+    Assert.assertEquals(0, segmentsToMoveToFixSkew);
+
+    // Find segmentsToMove with no limit on maxSegmentsToMove
+    final int segmentsToMove = SegmentToMoveCalculator
+        .computeNumSegmentsToMoveInTier(TIER, historicals, Integer.MAX_VALUE);
+
+    Assert.assertEquals(minSegmentsToMove, segmentsToMove);
+  }
+
+  @Test
+  public void testHalfSegmentsArePickedForMoveWhenFullSkew()
+  {
+    final List<ServerHolder> historicals = Arrays.asList(
+        createServer("A", WIKI_SEGMENTS),
+        createServer("B", Collections.emptyList())
+    );
+
+    final int minSegmentsToMove = 
SegmentToMoveCalculator.computeMinSegmentsToMoveInTier(10_000);
+    Assert.assertEquals(100, minSegmentsToMove);
+
+    final int segmentsToMoveToFixSkew = SegmentToMoveCalculator
+        .computeNumSegmentsToMoveToBalanceTier(TIER, historicals);
+    Assert.assertEquals(5_000, segmentsToMoveToFixSkew);
+
+    // Find segmentsToMove with no limit on maxSegmentsToMove
+    final int segmentsToMove = SegmentToMoveCalculator
+        .computeNumSegmentsToMoveInTier(TIER, historicals, Integer.MAX_VALUE);
+
+    Assert.assertEquals(segmentsToMoveToFixSkew, segmentsToMove);
+  }
+
+  @Test
+  public void testDatasourceWithLargestGapDeterminesNumToBalanceCounts()
+  {
+    // Both servers have all koala segments but only A has wiki segments
+    List<DataSegment> segmentsForServerA = new ArrayList<>(WIKI_SEGMENTS);
+    segmentsForServerA.addAll(KOALA_SEGMENTS);
+
+    final List<ServerHolder> historicals = Arrays.asList(
+        createServer("A", segmentsForServerA),
+        createServer("B", KOALA_SEGMENTS)
+    );
+
+    // Verify that half the wiki segments need to be moved for balance
+    int numToMoveToBalanceCount = SegmentToMoveCalculator
+        .computeSegmentsToMoveToBalanceCountsPerDatasource(TIER, historicals);
+    Assert.assertEquals(WIKI_SEGMENTS.size() / 2, numToMoveToBalanceCount);
+  }
+
+  private static int computeMaxSegmentsToMove(int totalSegments, int 
numThreads)
+  {
+    return SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(
+        totalSegments,
+        numThreads,
+        DEFAULT_COORDINATOR_PERIOD
+    );
+  }
+
+  private static int computeMaxSegmentsToMoveInPeriod(int totalSegments, 
Duration coordinatorPeriod)
+  {
+    return 
SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(totalSegments, 1, 
coordinatorPeriod);
+  }
+
+  private static int computeNumThreadsAndMaxToMove(int totalSegments)
+  {
+    return SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(
+        totalSegments,
+        SegmentLoadingConfig.computeNumBalancerThreads(totalSegments),
+        DEFAULT_COORDINATOR_PERIOD
+    );
+  }
+
+  private static int computeMinSegmentsToMove(int totalSegmentsInTier)
+  {
+    return 
SegmentToMoveCalculator.computeMinSegmentsToMoveInTier(totalSegmentsInTier);
+  }
+
+  private static ServerHolder createServer(String name, List<DataSegment> 
segments)
+  {
+    final DruidServer server
+        = new DruidServer(name, name, null, 10L << 30, ServerType.HISTORICAL, 
"tier1", 1);
+    segments.forEach(server::addDataSegment);
+    return new ServerHolder(server.toImmutableDruidServer(), new 
TestLoadQueuePeon());
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
index c608f58d9a..aada7b3214 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.joda.time.DateTime;
+import org.joda.time.Duration;
 import org.joda.time.Interval;
 import org.junit.After;
 import org.junit.Assert;
@@ -378,7 +379,7 @@ public class BalanceSegmentsTest
 
   private CoordinatorRunStats runBalancer(DruidCoordinatorRuntimeParams params)
   {
-    params = new BalanceSegments().run(params);
+    params = new BalanceSegments(Duration.standardMinutes(1)).run(params);
     if (params == null) {
       Assert.fail("BalanceSegments duty returned null params");
       return new CoordinatorRunStats();
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfigTest.java
new file mode 100644
index 0000000000..947ac45ca7
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfigTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SegmentLoadingConfigTest
+{
+
+  @Test
+  public void testComputeNumBalancerThreads()
+  {
+    Assert.assertEquals(1, computeBalancerThreads(0));
+    Assert.assertEquals(1, computeBalancerThreads(30_000));
+    Assert.assertEquals(2, computeBalancerThreads(50_000));
+    Assert.assertEquals(3, computeBalancerThreads(100_000));
+
+    Assert.assertEquals(4, computeBalancerThreads(175_000));
+    Assert.assertEquals(5, computeBalancerThreads(250_000));
+    Assert.assertEquals(6, computeBalancerThreads(350_000));
+    Assert.assertEquals(7, computeBalancerThreads(450_000));
+    Assert.assertEquals(8, computeBalancerThreads(600_000));
+
+    Assert.assertEquals(8, computeBalancerThreads(1_000_000));
+    Assert.assertEquals(8, computeBalancerThreads(10_000_000));
+  }
+
+  private int computeBalancerThreads(int numUsedSegments)
+  {
+    return SegmentLoadingConfig.computeNumBalancerThreads(numUsedSegments);
+  }
+
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
index 984fe217af..bd1af4a084 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
@@ -217,8 +217,8 @@ public abstract class CoordinatorSimulationBaseTest 
implements
   {
     static final String ASSIGNED_COUNT = "segment/assigned/count";
     static final String MOVED_COUNT = "segment/moved/count";
+    static final String MOVE_SKIPPED = "segment/moveSkipped/count";
     static final String DROPPED_COUNT = "segment/dropped/count";
-    static final String OVERSHADOWED_COUNT = "segment/overshadowed/count";
     static final String DELETED_COUNT = "segment/deleted/count";
     static final String LOAD_QUEUE_COUNT = "segment/loadQueue/count";
     static final String DROP_QUEUE_COUNT = "segment/dropQueue/count";
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index c5d5d94293..e40bf8ee16 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -74,7 +74,6 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class CoordinatorSimulationBuilder
 {
-  private static final long DEFAULT_COORDINATOR_PERIOD = 100L;
   private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper()
       .setInjectableValues(
           new InjectableValues.Std().addValue(
@@ -463,8 +462,8 @@ public class CoordinatorSimulationBuilder
 
       this.coordinatorConfig = new TestDruidCoordinatorConfig.Builder()
           .withCoordinatorStartDelay(new Duration(1L))
-          .withCoordinatorPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
-          .withCoordinatorKillPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
+          .withCoordinatorPeriod(Duration.standardMinutes(1))
+          .withCoordinatorKillPeriod(Duration.millis(100))
           .withLoadQueuePeonType("http")
           .withCoordinatorKillIgnoreDurationToRetain(false)
           .build();
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
index 383b5fe3c6..141c6ba7a1 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
@@ -26,6 +26,8 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * Coordinator simulation test to verify behaviour of segment balancing.
@@ -229,4 +231,39 @@ public class SegmentBalancingTest extends 
CoordinatorSimulationBaseTest
     Assert.assertTrue(getValue(Metric.MOVED_COUNT, null).intValue() > 0);
   }
 
+  @Test(timeout = 60000L)
+  public void testMaxSegmentsAreMovedWhenClusterIsSkewed()
+  {
+    // Add 10 historicals of size 1 TB each
+    final long size1TB = 1_000_000;
+    List<DruidServer> historicals
+        = IntStream.range(0, 10)
+                   .mapToObj(i -> createHistorical(i + 1, Tier.T1, size1TB))
+                   .collect(Collectors.toList());
+
+    CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(Segments.KOALA_100X100D)
+                             .withServers(historicals)
+                             .withRules(DS.KOALA, Load.on(Tier.T1, 
1).forever())
+                             .build();
+
+    startSimulation(sim);
+
+    // Run 1: All segments are assigned to the 10 historicals
+    runCoordinatorCycle();
+    verifyValue(Metric.ASSIGNED_COUNT, 10_000L);
+    verifyNotEmitted(Metric.MOVED_COUNT);
+    verifyValue(Metric.MOVE_SKIPPED, 100L);
+
+    // Run 2: Add 10 more historicals, some segments are moved to them
+    for (int i = 11; i <= 20; ++i) {
+      addServer(createHistorical(i, Tier.T1, size1TB));
+    }
+
+    runCoordinatorCycle();
+    verifyValue(Metric.MOVED_COUNT, 500L);
+    verifyNotEmitted(Metric.MOVE_SKIPPED);
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to