This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bd6bd0d4f Improve CostBalancerStrategy, deprecate cachingCost (#14484)
4bd6bd0d4f is described below

commit 4bd6bd0d4f61effa392cb5736d392ca12f7f3b40
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Jun 27 13:23:29 2023 +0530

    Improve CostBalancerStrategy, deprecate cachingCost (#14484)
    
    Changes to `cost` strategy:
    - In every `ServerHolder`, track the number of segments per datasource per 
interval
    - Perform cost computations for a given interval just once, and then 
multiply by a constant
    factor to account for the total segment count in that interval
    - Do not perform joint cost computations with segments that are outside the 
compute interval
    (± 45 days) for the segment being considered for move
    - Remove metrics `segment/cost/*` as they were coordinator killers! Turning 
on these metrics
    (by setting `emitBalancingStats` to true) has often caused the coordinator 
to be stuck for hours.
    Moreover, they are too complicated to decipher and do not provide any 
meaningful insight into
    a Druid cluster.
    - Add new simpler metrics `segment/balancer/compute/*` to track cost 
computation time,
    count and errors.
    
    Other changes:
    - Remove flaky test from `CostBalancerStrategyTest`.
    - Add tests to verify that computed cost has remained unchanged
    - Remove usages of mock `BalancerStrategy` from `LoadRuleTest`, 
`BalanceSegmentsTest`
    - Clean up `BalancerStrategy` interface
---
 .../druid/server/coordinator/DruidCoordinator.java |  24 +-
 .../coordinator/SegmentCountsPerInterval.java      |  67 ++++++
 .../druid/server/coordinator/ServerHolder.java     |  74 +++---
 .../coordinator/balancer/BalancerStrategy.java     |  56 ++---
 .../balancer/CachingCostBalancerStrategy.java      |   9 +-
 .../CachingCostBalancerStrategyFactory.java        |   4 +
 .../coordinator/balancer/CostBalancerStrategy.java | 266 +++++++++++----------
 .../DiskNormalizedCostBalancerStrategy.java        |   7 +-
 .../balancer/RandomBalancerStrategy.java           |  14 +-
 .../coordinator/balancer/TierSegmentBalancer.java  |  19 +-
 .../server/coordinator/duty/BalanceSegments.java   |   6 +
 .../duty/CollectSegmentAndServerStats.java         |  26 +-
 .../server/coordinator/duty/CompactSegments.java   |   3 +-
 .../duty/MarkOvershadowedSegmentsAsUnused.java     |   5 +-
 .../druid/server/coordinator/duty/RunRules.java    |   2 +-
 .../coordinator/loading/HttpLoadQueuePeon.java     |   6 +-
 .../coordinator/loading/SegmentLoadingConfig.java  |   2 +-
 .../loading/StrategicSegmentAssigner.java          |  26 +-
 .../coordinator/stats/CoordinatorRunStats.java     |  21 +-
 .../druid/server/coordinator/stats/RowKey.java     |  23 +-
 .../druid/server/coordinator/stats/Stats.java      |  12 +-
 .../coordinator/CoordinatorRunStatsTest.java       |  16 +-
 .../balancer/CachingCostBalancerStrategyTest.java  |   3 +-
 .../balancer/CostBalancerStrategyTest.java         | 137 ++++++-----
 .../coordinator/duty/BalanceSegmentsTest.java      |  81 ++-----
 .../coordinator/duty/CompactSegmentsTest.java      |   2 +-
 .../server/coordinator/duty/RunRulesTest.java      |  13 +-
 .../server/coordinator/rules/LoadRuleTest.java     | 158 +++---------
 28 files changed, 515 insertions(+), 567 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 0c5e79bdcb..c9d334968e 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -20,6 +20,7 @@
 package org.apache.druid.server.coordinator;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
@@ -98,6 +99,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -672,7 +674,7 @@ public class DruidCoordinator
     {
       try {
         log.info("Starting coordinator run for group [%s]", dutyGroupName);
-        final long globalStart = System.currentTimeMillis();
+        final Stopwatch groupRunTime = Stopwatch.createStarted();
 
         synchronized (lock) {
           if (!coordLeaderSelector.isLeader()) {
@@ -719,23 +721,25 @@ public class DruidCoordinator
           log.info("Coordination has been paused. Duties will not run until 
coordination is resumed.");
         }
 
+        final Stopwatch dutyRunTime = Stopwatch.createUnstarted();
         for (CoordinatorDuty duty : duties) {
           // Don't read state and run state in the same duty otherwise racy 
conditions may exist
           if (!coordinationPaused
               && coordLeaderSelector.isLeader()
               && startingLeaderCounter == coordLeaderSelector.localTerm()) {
 
-            final long start = System.currentTimeMillis();
+            dutyRunTime.reset().start();
             params = duty.run(params);
-            final long end = System.currentTimeMillis();
+            dutyRunTime.stop();
 
             final String dutyName = duty.getClass().getName();
             if (params == null) {
               log.info("Stopping run for group [%s] on request of duty [%s].", 
dutyGroupName, dutyName);
               return;
             } else {
-              final RowKey rowKey = RowKey.builder().add(Dimension.DUTY, 
dutyName).build();
-              
params.getCoordinatorStats().add(Stats.CoordinatorRun.DUTY_RUN_TIME, rowKey, 
end - start);
+              final RowKey rowKey = RowKey.of(Dimension.DUTY, dutyName);
+              final long dutyRunMillis = 
dutyRunTime.elapsed(TimeUnit.MILLISECONDS);
+              
params.getCoordinatorStats().add(Stats.CoordinatorRun.DUTY_RUN_TIME, rowKey, 
dutyRunMillis);
             }
           }
         }
@@ -745,9 +749,9 @@ public class DruidCoordinator
         if (allStats.rowCount() > 0) {
           final AtomicInteger emittedCount = new AtomicInteger();
           allStats.forEachStat(
-              (dimensionValues, stat, value) -> {
+              (stat, dimensions, value) -> {
                 if (stat.shouldEmit()) {
-                  emitStat(stat, dimensionValues, value);
+                  emitStat(stat, dimensions.getValues(), value);
                   emittedCount.incrementAndGet();
                 }
               }
@@ -760,7 +764,7 @@ public class DruidCoordinator
         }
 
         // Emit the runtime of the full DutiesRunnable
-        final long runMillis = System.currentTimeMillis() - globalStart;
+        final long runMillis = 
groupRunTime.stop().elapsed(TimeUnit.MILLISECONDS);
         emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, Collections.emptyMap(), 
runMillis);
         log.info("Finished coordinator run for group [%s] in [%d] ms", 
dutyGroupName, runMillis);
       }
@@ -771,10 +775,6 @@ public class DruidCoordinator
 
     private void emitStat(CoordinatorStat stat, Map<Dimension, String> 
dimensionValues, long value)
     {
-      if (stat.equals(Stats.Balancer.NORMALIZED_COST_X_1000)) {
-        value = value / 1000;
-      }
-
       ServiceMetricEvent.Builder eventBuilder = new 
ServiceMetricEvent.Builder()
           .setDimension(Dimension.DUTY_GROUP.reportedName(), dutyGroupName);
       dimensionValues.forEach(
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java
new file mode 100644
index 0000000000..ea1f81ee6d
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntMaps;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Maintains a count of segments for each datasource and interval.
+ */
+public class SegmentCountsPerInterval
+{
+  private final Map<String, Object2IntMap<Interval>>
+      datasourceIntervalToSegmentCount = new HashMap<>();
+  private final Object2IntMap<Interval> intervalToTotalSegmentCount = new 
Object2IntOpenHashMap<>();
+
+  public void addSegment(DataSegment segment)
+  {
+    updateCountInInterval(segment, 1);
+  }
+
+  public void removeSegment(DataSegment segment)
+  {
+    updateCountInInterval(segment, -1);
+  }
+
+  public Object2IntMap<Interval> getIntervalToSegmentCount(String datasource)
+  {
+    return datasourceIntervalToSegmentCount.getOrDefault(datasource, 
Object2IntMaps.emptyMap());
+  }
+
+  public Object2IntMap<Interval> getIntervalToTotalSegmentCount()
+  {
+    return intervalToTotalSegmentCount;
+  }
+
+  private void updateCountInInterval(DataSegment segment, int delta)
+  {
+    intervalToTotalSegmentCount.mergeInt(segment.getInterval(), delta, 
Integer::sum);
+    datasourceIntervalToSegmentCount
+        .computeIfAbsent(segment.getDataSource(), ds -> new 
Object2IntOpenHashMap<>())
+        .mergeInt(segment.getInterval(), delta, Integer::sum);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java 
b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
index 79f2c38ab2..7947f1c1b3 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
@@ -32,11 +32,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -75,11 +73,7 @@ public class ServerHolder implements Comparable<ServerHolder>
    */
   private final Map<DataSegment, SegmentAction> queuedSegments = new 
HashMap<>();
 
-  /**
-   * Segments that are expected to be loaded on this server once all the
-   * operations in progress have completed.
-   */
-  private final Set<DataSegment> projectedSegments = new HashSet<>();
+  private final SegmentCountsPerInterval projectedSegments = new 
SegmentCountsPerInterval();
 
   public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon)
   {
@@ -133,31 +127,31 @@ public class ServerHolder implements 
Comparable<ServerHolder>
       AtomicInteger loadingReplicaCount
   )
   {
-    projectedSegments.addAll(server.iterateAllSegments());
+    for (DataSegment segment : server.iterateAllSegments()) {
+      projectedSegments.addSegment(segment);
+    }
 
     final List<SegmentHolder> expiredSegments = new ArrayList<>();
-    peon.getSegmentsInQueue().forEach(
-        (holder) -> {
-          int runsInQueue = holder.incrementAndGetRunsInQueue();
-          if (runsInQueue > maxLifetimeInQueue) {
-            expiredSegments.add(holder);
-          }
-
-          final SegmentAction action = holder.getAction();
-          addToQueuedSegments(holder.getSegment(), simplify(action));
-
-          if (action == SegmentAction.MOVE_TO) {
-            movingSegmentCount.incrementAndGet();
-          }
-          if (action == SegmentAction.REPLICATE) {
-            loadingReplicaCount.incrementAndGet();
-          }
-        }
-    );
-
-    peon.getSegmentsMarkedToDrop().forEach(
-        segment -> addToQueuedSegments(segment, SegmentAction.MOVE_FROM)
-    );
+    for (SegmentHolder holder : peon.getSegmentsInQueue()) {
+      int runsInQueue = holder.incrementAndGetRunsInQueue();
+      if (runsInQueue > maxLifetimeInQueue) {
+        expiredSegments.add(holder);
+      }
+
+      final SegmentAction action = holder.getAction();
+      addToQueuedSegments(holder.getSegment(), simplify(action));
+
+      if (action == SegmentAction.MOVE_TO) {
+        movingSegmentCount.incrementAndGet();
+      }
+      if (action == SegmentAction.REPLICATE) {
+        loadingReplicaCount.incrementAndGet();
+      }
+    }
+
+    for (DataSegment segment : peon.getSegmentsMarkedToDrop()) {
+      addToQueuedSegments(segment, SegmentAction.MOVE_FROM);
+    }
 
     if (!expiredSegments.isEmpty()) {
       List<SegmentHolder> expiredSegmentsSubList =
@@ -251,11 +245,21 @@ public class ServerHolder implements 
Comparable<ServerHolder>
    * Segments that are expected to be loaded on this server once all the
    * operations in progress have completed.
    */
-  public Set<DataSegment> getProjectedSegments()
+  public SegmentCountsPerInterval getProjectedSegments()
   {
     return projectedSegments;
   }
 
+  public boolean isProjectedSegment(DataSegment segment)
+  {
+    SegmentAction action = getActionOnSegment(segment);
+    if (action == null) {
+      return hasSegmentLoaded(segment.getId());
+    } else {
+      return action.isLoad();
+    }
+  }
+
   /**
    * Segments that are currently in the queue for being loaded on this server.
    * This does not include segments that are being moved to this server.
@@ -362,10 +366,10 @@ public class ServerHolder implements 
Comparable<ServerHolder>
 
     // Add to projected if load is started, remove from projected if drop has 
started
     if (action.isLoad()) {
-      projectedSegments.add(segment);
+      projectedSegments.addSegment(segment);
       sizeOfLoadingSegments += segment.getSize();
     } else {
-      projectedSegments.remove(segment);
+      projectedSegments.removeSegment(segment);
       if (action == SegmentAction.DROP) {
         sizeOfDroppingSegments += segment.getSize();
       }
@@ -379,10 +383,10 @@ public class ServerHolder implements 
Comparable<ServerHolder>
     queuedSegments.remove(segment);
 
     if (action.isLoad()) {
-      projectedSegments.remove(segment);
+      projectedSegments.removeSegment(segment);
       sizeOfLoadingSegments -= segment.getSize();
     } else {
-      projectedSegments.add(segment);
+      projectedSegments.addSegment(segment);
       if (action == SegmentAction.DROP) {
         sizeOfDroppingSegments -= segment.getSize();
       }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java
index 7f27648d3f..e133430b1b 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java
@@ -20,70 +20,60 @@
 package org.apache.druid.server.coordinator.balancer;
 
 import org.apache.druid.server.coordinator.ServerHolder;
-import org.apache.druid.server.coordinator.duty.BalanceSegments;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.timeline.DataSegment;
 
 import javax.annotation.Nullable;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NavigableSet;
 
 /**
- * This interface describes the coordinator balancing strategy, which is 
responsible for making decisions on where
- * to place {@link DataSegment}s on historical servers (described by {@link 
ServerHolder}). The balancing strategy
- * is used by {@link org.apache.druid.server.coordinator.rules.LoadRule} to 
assign and drop segments, and by
- * {@link BalanceSegments} to migrate segments between historicals.
+ * Segment balancing strategy, used in every coordinator run by
+ * {@link org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner}
+ * to choose optimal servers to load, move or drop a segment.
  */
 public interface BalancerStrategy
 {
 
   /**
-   * Finds the best server to move a segment to according to the balancing 
strategy.
+   * Finds the best server from the list of {@code destinationServers} to load
+   * the {@code segmentToMove}, if it is moved from the {@code sourceServer}.
+   * <p>
+   * In order to avoid unnecessary moves when the segment is already optimally 
placed,
+   * include the {@code sourceServer} in the list of {@code 
destinationServers}.
    *
-   * @param proposalSegment    segment to move
-   * @param sourceServer       Server the segment is currently placed on.
-   * @param destinationServers servers to consider as move destinations
-   * @return The server to move to, or null if no move should be made or no 
server is suitable
+   * @return The server to move to, or null if the segment is already 
optimally placed.
    */
   @Nullable
   ServerHolder findDestinationServerToMoveSegment(
-      DataSegment proposalSegment,
+      DataSegment segmentToMove,
       ServerHolder sourceServer,
       List<ServerHolder> destinationServers
   );
 
   /**
-   * Finds the best servers on which to place the {@code proposalSegment}.
-   * This method can be used both for placing the first copy of a segment
-   * in the tier or a replica of the segment.
+   * Finds the best servers to load the given segment. This method can be used
+   * both for placing the first copy of a segment in a tier or a replica of an
+   * already available segment.
    *
-   * @param proposalSegment segment to place on servers
-   * @param serverHolders   servers to consider as segment homes
-   * @return Iterator over the best servers (in order) on which the segment
-   * can be placed.
+   * @return Iterator over the best servers (in order of preference) to load
+   * the segment.
    */
   Iterator<ServerHolder> findServersToLoadSegment(
-      DataSegment proposalSegment,
+      DataSegment segmentToLoad,
       List<ServerHolder> serverHolders
   );
 
   /**
-   * Returns an iterator for a set of servers to drop from, ordered by 
preference of which server to drop from first
-   * for a given drop strategy. One or more segments may be dropped, depending 
on how much the segment is
-   * over-replicated.
-   * @param toDropSegment segment to drop from one or more servers
-   * @param serverHolders set of historicals to consider dropping from
-   * @return Iterator for set of historicals, ordered by drop preference
+   * Finds the best servers to drop the given segment.
+   *
+   * @return Iterator over the servers (in order of preference) to drop the 
segment
    */
-  Iterator<ServerHolder> pickServersToDropSegment(DataSegment toDropSegment, 
NavigableSet<ServerHolder> serverHolders);
+  Iterator<ServerHolder> findServersToDropSegment(DataSegment segmentToDrop, 
List<ServerHolder> serverHolders);
 
   /**
-   * Add balancing strategy stats during the 'balanceTier' operation of
-   * {@link BalanceSegments} to be included
-   * @param tier historical tier being balanced
-   * @param stats stats object to add balancing strategy stats to
-   * @param serverHolderList servers in tier being balanced
+   * Returns the stats collected by the strategy in the current run and resets
+   * the stats collector for the next run.
    */
-  void emitStats(String tier, CoordinatorRunStats stats, List<ServerHolder> 
serverHolderList);
+  CoordinatorRunStats getAndResetStats();
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategy.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategy.java
index 424657991c..eda9928915 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategy.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategy.java
@@ -28,10 +28,8 @@ import org.apache.druid.timeline.DataSegment;
 import java.util.Collections;
 import java.util.Set;
 
-
 public class CachingCostBalancerStrategy extends CostBalancerStrategy
 {
-
   private final ClusterCostCache clusterCostCache;
 
   public CachingCostBalancerStrategy(ClusterCostCache clusterCostCache, 
ListeningExecutorService exec)
@@ -41,13 +39,8 @@ public class CachingCostBalancerStrategy extends 
CostBalancerStrategy
   }
 
   @Override
-  protected double computeCost(DataSegment proposalSegment, ServerHolder 
server, boolean includeCurrentServer)
+  protected double computePlacementCost(DataSegment proposalSegment, 
ServerHolder server)
   {
-    // (optional) Don't include server if it cannot load the segment
-    if (!includeCurrentServer && !server.canLoadSegment(proposalSegment)) {
-      return Double.POSITIVE_INFINITY;
-    }
-
     final String serverName = server.getServer().getName();
 
     double cost = clusterCostCache.computeCost(serverName, proposalSegment);
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
index 726dffe690..0ddacaead7 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
@@ -125,6 +125,10 @@ public class CachingCostBalancerStrategyFactory implements 
BalancerStrategyFacto
   @Override
   public BalancerStrategy createBalancerStrategy(final 
ListeningExecutorService exec)
   {
+    LOG.warn(
+        "'cachingCost' balancer strategy has been deprecated as it can lead to"
+        + " unbalanced clusters. Use 'cost' strategy instead."
+    );
     if (!isInitialized() && config.isAwaitInitialization()) {
       try {
         final long startMillis = System.currentTimeMillis();
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
index 6d165c1f34..aae907bb59 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
@@ -19,16 +19,21 @@
 
 package org.apache.druid.server.coordinator.balancer;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import org.apache.commons.math3.util.FastMath;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.server.coordinator.SegmentCountsPerInterval;
 import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.loading.SegmentAction;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
 import org.apache.druid.server.coordinator.stats.RowKey;
 import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
@@ -38,11 +43,10 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NavigableSet;
 import java.util.PriorityQueue;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 public class CostBalancerStrategy implements BalancerStrategy
 {
@@ -63,6 +67,20 @@ public class CostBalancerStrategy implements BalancerStrategy
       = Comparator.<Pair<Double, ServerHolder>, Double>comparing(pair -> 
pair.lhs)
       .thenComparing(pair -> pair.rhs);
 
+  private final CoordinatorRunStats stats = new CoordinatorRunStats();
+
+  public static double computeJointSegmentsCost(DataSegment segment, 
Iterable<DataSegment> segmentSet)
+  {
+    final Interval costComputeInterval = getCostComputeInterval(segment);
+    double totalCost = 0;
+    for (DataSegment s : segmentSet) {
+      if (costComputeInterval.overlaps(s.getInterval())) {
+        totalCost += computeJointSegmentsCost(segment, s);
+      }
+    }
+    return totalCost;
+  }
+
   /**
    * This defines the unnormalized cost function between two segments.
    *
@@ -83,15 +101,20 @@ public class CostBalancerStrategy implements 
BalancerStrategy
     final Interval intervalA = segmentA.getInterval();
     final Interval intervalB = segmentB.getInterval();
 
+    // constant cost-multiplier for segments of the same datsource
+    final double multiplier = 
segmentA.getDataSource().equals(segmentB.getDataSource())
+                              ? 2.0 : 1.0;
+    return intervalCost(intervalA, intervalB) * multiplier;
+  }
+
+  public static double intervalCost(Interval intervalA, Interval intervalB)
+  {
     final double t0 = intervalA.getStartMillis();
     final double t1 = (intervalA.getEndMillis() - t0) / MILLIS_FACTOR;
     final double start = (intervalB.getStartMillis() - t0) / MILLIS_FACTOR;
     final double end = (intervalB.getEndMillis() - t0) / MILLIS_FACTOR;
 
-    // constant cost-multiplier for segments of the same datsource
-    final double multiplier = 
segmentA.getDataSource().equals(segmentB.getDataSource()) ? 2.0 : 1.0;
-
-    return INV_LAMBDA_SQUARE * intervalCost(t1, start, end) * multiplier;
+    return INV_LAMBDA_SQUARE * intervalCost(t1, start, end);
   }
 
   /**
@@ -199,161 +222,119 @@ public class CostBalancerStrategy implements 
BalancerStrategy
 
   @Override
   public Iterator<ServerHolder> findServersToLoadSegment(
-      DataSegment proposalSegment,
+      DataSegment segmentToLoad,
       List<ServerHolder> serverHolders
   )
   {
-    return getServersByPlacementCost(proposalSegment, serverHolders, false, 
"findServersToLoadSegment");
+    return orderServersByPlacementCost(segmentToLoad, serverHolders, 
SegmentAction.LOAD)
+        .stream()
+        .filter(server -> server.canLoadSegment(segmentToLoad))
+        .iterator();
   }
 
-
   @Override
   public ServerHolder findDestinationServerToMoveSegment(
-      DataSegment proposalSegment,
+      DataSegment segmentToMove,
       ServerHolder sourceServer,
       List<ServerHolder> serverHolders
   )
   {
-    Iterator<ServerHolder> servers =
-        getServersByPlacementCost(proposalSegment, serverHolders, true, 
"findServerToMoveSegment");
-    return servers.hasNext() ? servers.next() : null;
-  }
-
-  public static double computeJointSegmentsCost(DataSegment segment, 
Iterable<DataSegment> segmentSet)
-  {
-    double totalCost = 0;
-    for (DataSegment s : segmentSet) {
-      totalCost += computeJointSegmentsCost(segment, s);
+    List<ServerHolder> servers =
+        orderServersByPlacementCost(segmentToMove, serverHolders, 
SegmentAction.MOVE_TO);
+    if (servers.isEmpty()) {
+      return null;
     }
-    return totalCost;
+
+    ServerHolder candidateServer = servers.get(0);
+    return candidateServer.equals(sourceServer) ? null : candidateServer;
   }
 
   @Override
-  public Iterator<ServerHolder> pickServersToDropSegment(
+  public Iterator<ServerHolder> findServersToDropSegment(
       DataSegment segmentToDrop,
-      NavigableSet<ServerHolder> serverHolders
+      List<ServerHolder> serverHolders
   )
   {
-    List<ServerHolder> serversByCost = Lists.newArrayList(
-        getServersByPlacementCost(segmentToDrop, serverHolders, true, 
"pickServersToDropSegment")
-    );
+    List<ServerHolder> serversByCost =
+        orderServersByPlacementCost(segmentToDrop, serverHolders, 
SegmentAction.DROP);
 
     // Prioritize drop from highest cost servers
     return Lists.reverse(serversByCost).iterator();
   }
 
-  /**
-   * Calculates the initial cost of the Druid segment configuration.
-   *
-   * @param serverHolders A list of ServerHolders for a particular tier.
-   *
-   * @return The initial cost of the Druid tier.
-   */
-  public double calculateInitialTotalCost(final List<ServerHolder> 
serverHolders)
+  @Override
+  public CoordinatorRunStats getAndResetStats()
   {
-    double cost = 0;
-    for (ServerHolder server : serverHolders) {
-      // segments are dumped into an array because it's probably better than 
iterating the iterateAllSegments() result
-      // quadratically in a loop, which can generate garbage in the form of 
Stream, Spliterator, Iterator, etc. objects
-      // whose total memory volume exceeds the size of the DataSegment array.
-      DataSegment[] segments = 
server.getServer().iterateAllSegments().toArray(new DataSegment[0]);
-      for (DataSegment s1 : segments) {
-        for (DataSegment s2 : segments) {
-          cost += computeJointSegmentsCost(s1, s2);
-        }
-      }
-    }
-    return cost;
+    return stats.getSnapshotAndReset();
   }
 
   /**
-   * Calculates the cost normalization.  This is such that the normalized cost 
is lower bounded
-   * by 1 (e.g. when each segment gets its own historical node).
-   *
-   * @param serverHolders A list of ServerHolders for a particular tier.
-   *
-   * @return The normalization value (the sum of the diagonal entries in the
-   * pairwise cost matrix).  This is the cost of a cluster if each
-   * segment were to get its own historical node.
+   * Computes the cost of placing a segment on this server.
    */
-  public double calculateNormalization(final List<ServerHolder> serverHolders)
-  {
-    double cost = 0;
-    for (ServerHolder server : serverHolders) {
-      for (DataSegment segment : server.getServedSegments()) {
-        cost += computeJointSegmentsCost(segment, segment);
-      }
-    }
-    return cost;
-  }
-
-  @Override
-  public void emitStats(String tier, CoordinatorRunStats stats, 
List<ServerHolder> serverHolderList)
-  {
-    final double initialTotalCost = 
calculateInitialTotalCost(serverHolderList);
-    final double normalization = calculateNormalization(serverHolderList);
-    final double normalizedInitialCost = initialTotalCost / normalization;
-
-    final RowKey rowKey = RowKey.forTier(tier);
-    stats.add(Stats.Balancer.RAW_COST, rowKey, (long) initialTotalCost);
-    stats.add(Stats.Balancer.NORMALIZATION_COST, rowKey, (long) normalization);
-    stats.add(Stats.Balancer.NORMALIZED_COST_X_1000, rowKey, (long) 
(normalizedInitialCost * 1000));
-
-    log.info(
-        "[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial 
Normalized Cost: [%f]",
-        tier, initialTotalCost, normalization, normalizedInitialCost
-    );
-  }
-
-  protected double computeCost(
-      final DataSegment proposalSegment,
-      final ServerHolder server,
-      final boolean includeCurrentServer
-  )
+  protected double computePlacementCost(DataSegment proposalSegment, 
ServerHolder server)
   {
-    // (optional) Don't include server if it cannot load the segment
-    if (!includeCurrentServer && !server.canLoadSegment(proposalSegment)) {
-      return Double.POSITIVE_INFINITY;
-    }
+    final Interval costComputeInterval = 
getCostComputeInterval(proposalSegment);
 
-    // The contribution to the total cost of a given server by proposing to 
move the segment to that server is...
-    double cost = 0d;
+    // Compute number of segments in each interval
+    final Object2IntOpenHashMap<Interval> intervalToSegmentCount = new 
Object2IntOpenHashMap<>();
 
-    // the sum of the costs of segments expected to be on the server (loaded + 
loading - dropping)
-    Set<DataSegment> projectedSegments = server.getProjectedSegments();
-    cost += computeJointSegmentsCost(proposalSegment, projectedSegments);
+    final SegmentCountsPerInterval projectedSegments = 
server.getProjectedSegments();
+    
projectedSegments.getIntervalToTotalSegmentCount().object2IntEntrySet().forEach(entry
 -> {
+      final Interval interval = entry.getKey();
+      if (costComputeInterval.overlaps(interval)) {
+        intervalToSegmentCount.addTo(interval, entry.getIntValue());
+      }
+    });
+
+    // Count the segments for the same datasource twice as they have twice the 
cost
+    final String datasource = proposalSegment.getDataSource();
+    
projectedSegments.getIntervalToSegmentCount(datasource).object2IntEntrySet().forEach(entry
 -> {
+      final Interval interval = entry.getKey();
+      if (costComputeInterval.overlaps(interval)) {
+        intervalToSegmentCount.addTo(interval, entry.getIntValue());
+      }
+    });
 
-    // minus the self cost of the segment
-    if (projectedSegments.contains(proposalSegment)) {
-      cost -= computeJointSegmentsCost(proposalSegment, proposalSegment);
+    // Compute joint cost for each interval
+    double cost = 0;
+    final Interval segmentInterval = proposalSegment.getInterval();
+    cost += intervalToSegmentCount.object2IntEntrySet().stream().mapToDouble(
+        entry -> intervalCost(segmentInterval, entry.getKey())
+                 * entry.getIntValue()
+    ).sum();
+
+    // Minus the self cost of the segment
+    if (server.isProjectedSegment(proposalSegment)) {
+      cost -= intervalCost(segmentInterval, segmentInterval) * 2.0;
     }
 
     return cost;
   }
 
   /**
-   * Returns an iterator over the servers, ordered by increasing cost for
-   * placing the given segment on that server.
-   *
-   * @param includeCurrentServer true if the server already serving a replica
-   *                             of this segment should be included in the 
results
+   * Orders the servers by increasing cost for placing the given segment.
    */
-  private Iterator<ServerHolder> getServersByPlacementCost(
-      DataSegment proposalSegment,
-      Iterable<ServerHolder> serverHolders,
-      boolean includeCurrentServer,
-      String action
+  private List<ServerHolder> orderServersByPlacementCost(
+      DataSegment segment,
+      List<ServerHolder> serverHolders,
+      SegmentAction action
   )
   {
+    final Stopwatch computeTime = Stopwatch.createStarted();
     final List<ListenableFuture<Pair<Double, ServerHolder>>> futures = new 
ArrayList<>();
     for (ServerHolder server : serverHolders) {
       futures.add(
           exec.submit(
-              () -> Pair.of(computeCost(proposalSegment, server, 
includeCurrentServer), server)
+              () -> Pair.of(computePlacementCost(segment, server), server)
           )
       );
     }
 
+    String tier = serverHolders.isEmpty() ? null : 
serverHolders.get(0).getServer().getTier();
+    final RowKey metricKey = RowKey.with(Dimension.TIER, tier)
+                                   .with(Dimension.DATASOURCE, 
segment.getDataSource())
+                                   .and(Dimension.DESCRIPTION, action.name());
+
     final PriorityQueue<Pair<Double, ServerHolder>> costPrioritizedServers =
         new PriorityQueue<>(CHEAPEST_SERVERS_FIRST);
     try {
@@ -364,30 +345,57 @@ public class CostBalancerStrategy implements 
BalancerStrategy
       );
     }
     catch (Exception e) {
-      alertOnFailure(e, action);
+      stats.add(Stats.Balancer.COMPUTATION_ERRORS, metricKey, 1);
+      handleFailure(e, segment, action);
     }
 
-    // Include current server only if specified
-    return costPrioritizedServers.stream()
-                      .filter(pair -> includeCurrentServer || 
pair.rhs.canLoadSegment(proposalSegment))
-                      .map(pair -> pair.rhs).iterator();
+    // Report computation stats
+    computeTime.stop();
+    stats.add(Stats.Balancer.COMPUTATION_COUNT, metricKey, 1);
+    stats.add(Stats.Balancer.COMPUTATION_TIME, metricKey, 
computeTime.elapsed(TimeUnit.MILLISECONDS));
+
+    return costPrioritizedServers.stream().map(pair -> pair.rhs)
+                                 .collect(Collectors.toList());
   }
 
-  private void alertOnFailure(Exception e, String action)
+  private void handleFailure(
+      Exception e,
+      DataSegment segment,
+      SegmentAction action
+  )
   {
-    // Do not alert if the executor has been shutdown
+    final String reason;
+    String suggestion = "";
     if (exec.isShutdown()) {
-      log.noStackTrace().info("Balancer executor was terminated. Failing 
action [%s].", action);
-      return;
+      reason = "Executor shutdown";
+    } else if (e instanceof TimeoutException) {
+      reason = "Timed out";
+      suggestion = " Try setting a higher value for 'balancerComputeThreads'.";
+    } else {
+      reason = e.getMessage();
     }
 
-    final boolean hasTimedOut = e instanceof TimeoutException;
-    final String message = StringUtils.format(
-        "Cost balancer strategy %s in action [%s].%s",
-        hasTimedOut ? "timed out" : "failed", action,
-        hasTimedOut ? " Try setting a higher value of 
'balancerComputeThreads'." : ""
-    );
-    log.makeAlert(e, message).emit();
+    String msgFormat = "Cost strategy computations failed for action[%s] on 
segment[%s] due to reason[%s].[%s]";
+    log.noStackTrace().warn(e, msgFormat, action, segment.getId(), reason, 
suggestion);
+  }
+
+  /**
+   * The cost compute interval for a segment is {@code [start-45days, 
end+45days)}.
+   * This is because the joint cost of any two segments that are 45 days apart 
is
+   * negligible.
+   */
+  private static Interval getCostComputeInterval(DataSegment segment)
+  {
+    final Interval segmentInterval = segment.getInterval();
+    if (Intervals.isEternity(segmentInterval)) {
+      return segmentInterval;
+    } else {
+      final long maxGap = TimeUnit.DAYS.toMillis(45);
+      return Intervals.utc(
+          segmentInterval.getStartMillis() - maxGap,
+          segmentInterval.getEndMillis() + maxGap
+      );
+    }
   }
 
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategy.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategy.java
index cee292930c..601e5b042e 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategy.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategy.java
@@ -35,13 +35,12 @@ public class DiskNormalizedCostBalancerStrategy extends 
CostBalancerStrategy
    * This ensures that all the hosts will have the same % disk utilization.
    */
   @Override
-  protected double computeCost(
+  protected double computePlacementCost(
       final DataSegment proposalSegment,
-      final ServerHolder server,
-      final boolean includeCurrentServer
+      final ServerHolder server
   )
   {
-    double cost = super.computeCost(proposalSegment, server, 
includeCurrentServer);
+    double cost = super.computePlacementCost(proposalSegment, server);
 
     if (cost == Double.POSITIVE_INFINITY) {
       return cost;
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
index 98b1d8bdc7..cccc2518e8 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
@@ -27,7 +27,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NavigableSet;
 import java.util.stream.Collectors;
 
 /**
@@ -39,16 +38,18 @@ import java.util.stream.Collectors;
  */
 public class RandomBalancerStrategy implements BalancerStrategy
 {
+  private static final CoordinatorRunStats EMPTY_STATS = new 
CoordinatorRunStats();
+
   @Override
   public Iterator<ServerHolder> findServersToLoadSegment(
-      DataSegment proposalSegment,
+      DataSegment segmentToLoad,
       List<ServerHolder> serverHolders
   )
   {
     // Filter out servers which cannot load this segment
     final List<ServerHolder> usableServerHolders =
         serverHolders.stream()
-                     .filter(server -> server.canLoadSegment(proposalSegment))
+                     .filter(server -> server.canLoadSegment(segmentToLoad))
                      .collect(Collectors.toList());
     Collections.shuffle(usableServerHolders);
     return usableServerHolders.iterator();
@@ -56,7 +57,7 @@ public class RandomBalancerStrategy implements 
BalancerStrategy
 
   @Override
   public ServerHolder findDestinationServerToMoveSegment(
-      DataSegment proposalSegment,
+      DataSegment segmentToMove,
       ServerHolder sourceServer,
       List<ServerHolder> serverHolders
   )
@@ -66,7 +67,7 @@ public class RandomBalancerStrategy implements 
BalancerStrategy
   }
 
   @Override
-  public Iterator<ServerHolder> pickServersToDropSegment(DataSegment 
toDropSegment, NavigableSet<ServerHolder> serverHolders)
+  public Iterator<ServerHolder> findServersToDropSegment(DataSegment 
segmentToDrop, List<ServerHolder> serverHolders)
   {
     List<ServerHolder> serverList = new ArrayList<>(serverHolders);
     Collections.shuffle(serverList);
@@ -74,7 +75,8 @@ public class RandomBalancerStrategy implements 
BalancerStrategy
   }
 
   @Override
-  public void emitStats(String tier, CoordinatorRunStats stats, 
List<ServerHolder> serverHolderList)
+  public CoordinatorRunStats getAndResetStats()
   {
+    return EMPTY_STATS;
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
index a5ab8d0d47..86e5b99629 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.server.coordinator.balancer;
 
-import com.google.common.collect.Lists;
 import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
@@ -56,11 +55,9 @@ public class TierSegmentBalancer
   private final DruidCoordinatorRuntimeParams params;
   private final StrategicSegmentAssigner segmentAssigner;
 
-  private final BalancerStrategy strategy;
   private final SegmentLoadingConfig loadingConfig;
   private final CoordinatorRunStats runStats;
 
-  private final Set<ServerHolder> allServers;
   private final List<ServerHolder> activeServers;
   private final List<ServerHolder> decommissioningServers;
   private final int totalMaxSegmentsToMove;
@@ -77,7 +74,6 @@ public class TierSegmentBalancer
     this.params = params;
     this.segmentAssigner = params.getSegmentAssigner();
 
-    this.strategy = params.getBalancerStrategy();
     this.loadingConfig = params.getSegmentLoadingConfig();
     this.totalMaxSegmentsToMove = loadingConfig.getMaxSegmentsToMove();
     this.runStats = segmentAssigner.getStats();
@@ -86,7 +82,6 @@ public class TierSegmentBalancer
         
servers.stream().collect(Collectors.partitioningBy(ServerHolder::isDecommissioning));
     this.decommissioningServers = partitions.get(true);
     this.activeServers = partitions.get(false);
-    this.allServers = servers;
 
     this.movingSegmentCount = 
activeServers.stream().mapToInt(ServerHolder::getNumMovingSegments).sum();
   }
@@ -128,10 +123,6 @@ public class TierSegmentBalancer
         "Moved [%d] segments out of max [%d] between active servers in tier 
[%s].",
         movedGeneralSegments, maxGeneralSegmentsToMove, tier
     );
-
-    if (loadingConfig.isEmitBalancingStats()) {
-      strategy.emitStats(tier, runStats, Lists.newArrayList(allServers));
-    }
   }
 
   private int moveSegmentsFromTo(
@@ -224,13 +215,9 @@ public class TierSegmentBalancer
 
   private void markUnmoved(String reason, DataSegment segment)
   {
-    final RowKey key
-        = RowKey.builder()
-                .add(Dimension.TIER, tier)
-                .add(Dimension.DATASOURCE, segment.getDataSource())
-                .add(Dimension.DESCRIPTION, reason)
-                .build();
-
+    RowKey key = RowKey.with(Dimension.TIER, tier)
+                       .with(Dimension.DATASOURCE, segment.getDataSource())
+                       .and(Dimension.DESCRIPTION, reason);
     runStats.add(Stats.Segments.MOVE_SKIPPED, key, 1);
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
index 865fde2a0b..4ff2e65743 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
@@ -24,6 +24,7 @@ import org.apache.druid.server.coordinator.DruidCluster;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.balancer.TierSegmentBalancer;
 import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 
 /**
  *
@@ -57,6 +58,11 @@ public class BalanceSegments implements CoordinatorDuty
         (tier, servers) -> new TierSegmentBalancer(tier, servers, params).run()
     );
 
+    CoordinatorRunStats runStats = params.getCoordinatorStats();
+    params.getBalancerStrategy()
+          .getAndResetStats()
+          .forEachStat(runStats::add);
+
     return params;
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
index b5825cb084..5109ec3c3f 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
@@ -72,7 +72,7 @@ public class CollectSegmentAndServerStats implements 
CoordinatorDuty
 
     final DruidCluster cluster = params.getDruidCluster();
     cluster.getHistoricals().forEach((tier, historicals) -> {
-      final RowKey rowKey = RowKey.forTier(tier);
+      final RowKey rowKey = RowKey.of(Dimension.TIER, tier);
       stats.add(Stats.Tier.HISTORICAL_COUNT, rowKey, historicals.size());
       long totalCapacity = 
historicals.stream().map(ServerHolder::getMaxSize).reduce(0L, Long::sum);
       stats.add(Stats.Tier.TOTAL_CAPACITY, rowKey, totalCapacity);
@@ -80,20 +80,23 @@ public class CollectSegmentAndServerStats implements 
CoordinatorDuty
 
     // Collect load queue stats
     coordinator.getLoadManagementPeons().forEach((serverName, queuePeon) -> {
-      final RowKey rowKey = RowKey.builder().add(Dimension.SERVER, 
serverName).build();
+      final RowKey rowKey = RowKey.of(Dimension.SERVER, serverName);
       stats.add(Stats.SegmentQueue.BYTES_TO_LOAD, rowKey, 
queuePeon.getSizeOfSegmentsToLoad());
       stats.add(Stats.SegmentQueue.NUM_TO_LOAD, rowKey, 
queuePeon.getSegmentsToLoad().size());
       stats.add(Stats.SegmentQueue.NUM_TO_DROP, rowKey, 
queuePeon.getSegmentsToDrop().size());
 
       queuePeon.getAndResetStats().forEachStat(
-          (dimValues, stat, statValue) ->
-              stats.add(stat, createRowKeyForServer(serverName, dimValues), 
statValue)
+          (stat, key, statValue) ->
+              stats.add(stat, createRowKeyForServer(serverName, 
key.getValues()), statValue)
       );
     });
 
     coordinator.getDatasourceToUnavailableSegmentCount().forEach(
-        (dataSource, numUnavailable) ->
-            stats.addToDatasourceStat(Stats.Segments.UNAVAILABLE, dataSource, 
numUnavailable)
+        (dataSource, numUnavailable) -> stats.add(
+            Stats.Segments.UNAVAILABLE,
+            RowKey.of(Dimension.DATASOURCE, dataSource),
+            numUnavailable
+        )
     );
 
     coordinator.getTierToDatasourceToUnderReplicatedCount(false).forEach(
@@ -108,17 +111,18 @@ public class CollectSegmentAndServerStats implements 
CoordinatorDuty
         (dataSource, timeline) -> {
           long totalSizeOfUsedSegments = timeline.iterateAllObjects().stream()
                                                  
.mapToLong(DataSegment::getSize).sum();
-          stats.addToDatasourceStat(Stats.Segments.USED_BYTES, dataSource, 
totalSizeOfUsedSegments);
-          stats.addToDatasourceStat(Stats.Segments.USED, dataSource, 
timeline.getNumObjects());
+
+          RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource);
+          stats.add(Stats.Segments.USED_BYTES, datasourceKey, 
totalSizeOfUsedSegments);
+          stats.add(Stats.Segments.USED, datasourceKey, 
timeline.getNumObjects());
         }
     );
   }
 
   private RowKey createRowKeyForServer(String serverName, Map<Dimension, 
String> dimensionValues)
   {
-    final RowKey.Builder builder = RowKey.builder();
-    dimensionValues.forEach(builder::add);
-    builder.add(Dimension.SERVER, serverName);
+    final RowKey.Builder builder = RowKey.with(Dimension.SERVER, serverName);
+    dimensionValues.forEach(builder::with);
     return builder.build();
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index d767c51a92..0381a70ba2 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -43,6 +43,7 @@ import 
org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
 import org.apache.druid.server.coordinator.stats.RowKey;
 import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
@@ -593,7 +594,7 @@ public class CompactSegments implements 
CoordinatorCustomDuty
       CoordinatorRunStats stats
   )
   {
-    final RowKey rowKey = RowKey.forDatasource(dataSource);
+    final RowKey rowKey = RowKey.of(Dimension.DATASOURCE, dataSource);
 
     stats.add(Stats.Compaction.PENDING_BYTES, rowKey, 
autoCompactionSnapshot.getBytesAwaitingCompaction());
     stats.add(Stats.Compaction.PENDING_SEGMENTS, rowKey, 
autoCompactionSnapshot.getSegmentCountAwaitingCompaction());
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java
index 1a63083bf7..3cb9f0064c 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java
@@ -28,6 +28,8 @@ import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
 import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
@@ -100,7 +102,8 @@ public class MarkOvershadowedSegmentsAsUnused implements 
CoordinatorDuty
     final CoordinatorRunStats stats = params.getCoordinatorStats();
     datasourceToUnusedSegments.forEach(
         (datasource, unusedSegments) -> {
-          stats.addToDatasourceStat(Stats.Segments.OVERSHADOWED, datasource, 
unusedSegments.size());
+          RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, datasource);
+          stats.add(Stats.Segments.OVERSHADOWED, datasourceKey, 
unusedSegments.size());
           coordinator.markSegmentsAsUnused(datasource, unusedSegments);
         }
     );
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java
index eb037d1212..a5c4eb5804 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java
@@ -67,7 +67,7 @@ public class RunRules implements CoordinatorDuty
     final Set<DataSegment> overshadowed = 
params.getDataSourcesSnapshot().getOvershadowedSegments();
     final Set<DataSegment> usedSegments = params.getUsedSegments();
     log.info(
-        "Applying retention rules on [%d] used segments, skipping [%d] 
overshadowed segments.",
+        "Applying retention rules on [%,d] used segments, skipping [%,d] 
overshadowed segments.",
         usedSegments.size(), overshadowed.size()
     );
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
index 9c133fce8c..92dd4714a4 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
@@ -40,6 +40,8 @@ import 
org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
 import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.coordinator.stats.CoordinatorStat;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
 import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
 import org.jboss.netty.handler.codec.http.HttpHeaders;
@@ -543,7 +545,9 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
 
   private void incrementStat(SegmentHolder holder, RequestStatus status)
   {
-    stats.addToDatasourceStat(status.datasourceStat, 
holder.getSegment().getDataSource(), 1);
+    RowKey rowKey = RowKey.with(Dimension.DATASOURCE, 
holder.getSegment().getDataSource())
+                          .and(Dimension.DESCRIPTION, 
holder.getAction().name());
+    stats.add(status.datasourceStat, rowKey, 1);
   }
 
   private void executeCallbacks(SegmentHolder holder, boolean success)
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
index 559950c18e..fbf867fd91 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
@@ -61,7 +61,7 @@ public class SegmentLoadingConfig
 
       log.info(
           "Smart segment loading is enabled. Recomputed 
replicationThrottleLimit"
-          + " [%,d] (%d%% of used segments [%,d]) and maxSegmentsToMove [%d].",
+          + " [%,d] (%d%% of used segments [%,d]) and maxSegmentsToMove 
[%,d].",
           replicationThrottleLimit, throttlePercentage, numUsedSegments, 
maxSegmentsToMove
       );
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
index 4fc2e176f7..6546cce69c 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
@@ -33,6 +33,7 @@ import org.apache.druid.server.coordinator.stats.RowKey;
 import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
 
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -296,7 +297,7 @@ public class StrategicSegmentAssigner implements 
SegmentActionHandler
 
   private void reportTierCapacityStats(DataSegment segment, int 
requiredReplicas, String tier)
   {
-    final RowKey rowKey = RowKey.forTier(tier);
+    final RowKey rowKey = RowKey.of(Dimension.TIER, tier);
     stats.updateMax(Stats.Tier.REPLICATION_FACTOR, rowKey, requiredReplicas);
     stats.add(Stats.Tier.REQUIRED_CAPACITY, rowKey, segment.getSize() * 
requiredReplicas);
   }
@@ -342,7 +343,8 @@ public class StrategicSegmentAssigner implements 
SegmentActionHandler
   public void deleteSegment(DataSegment segment)
   {
     loadQueueManager.deleteSegment(segment);
-    stats.addToDatasourceStat(Stats.Segments.DELETED, segment.getDataSource(), 
1);
+    RowKey rowKey = RowKey.of(Dimension.DATASOURCE, segment.getDataSource());
+    stats.add(Stats.Segments.DELETED, rowKey, 1);
   }
 
   /**
@@ -429,9 +431,9 @@ public class StrategicSegmentAssigner implements 
SegmentActionHandler
     if (numToDrop > numDropsQueued) {
       remainingNumToDrop = numToDrop - numDropsQueued;
       Iterator<ServerHolder> serverIterator =
-          (useRoundRobinAssignment || eligibleLiveServers.size() >= 
remainingNumToDrop)
+          (useRoundRobinAssignment || eligibleLiveServers.size() <= 
remainingNumToDrop)
           ? eligibleLiveServers.iterator()
-          : strategy.pickServersToDropSegment(segment, eligibleLiveServers);
+          : strategy.findServersToDropSegment(segment, new 
ArrayList<>(eligibleLiveServers));
       numDropsQueued += dropReplicasFromServers(remainingNumToDrop, segment, 
serverIterator, tier);
     }
 
@@ -493,7 +495,7 @@ public class StrategicSegmentAssigner implements 
SegmentActionHandler
         ? serverSelector.getServersInTierToLoadSegment(tier, segment)
         : strategy.findServersToLoadSegment(segment, eligibleServers);
     if (!serverIterator.hasNext()) {
-      incrementSkipStat(Stats.Segments.ASSIGN_SKIPPED, "No server chosen by 
strategy", segment, tier);
+      incrementSkipStat(Stats.Segments.ASSIGN_SKIPPED, "No strategic server", 
segment, tier);
       return 0;
     }
 
@@ -586,16 +588,10 @@ public class StrategicSegmentAssigner implements 
SegmentActionHandler
 
   private void incrementSkipStat(CoordinatorStat stat, String reason, 
DataSegment segment, String tier)
   {
-    final RowKey.Builder keyBuilder
-        = RowKey.builder()
-                .add(Dimension.TIER, tier)
-                .add(Dimension.DATASOURCE, segment.getDataSource());
-
-    if (reason != null) {
-      keyBuilder.add(Dimension.DESCRIPTION, reason);
-    }
-
-    stats.add(stat, keyBuilder.build(), 1);
+    final RowKey key = RowKey.with(Dimension.TIER, tier)
+                             .with(Dimension.DATASOURCE, 
segment.getDataSource())
+                             .and(Dimension.DESCRIPTION, reason);
+    stats.add(stat, key, 1);
   }
 
   private void incrementStat(CoordinatorStat stat, DataSegment segment, String 
tier, long value)
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
index 7d63c78c47..f8ea6cf98f 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
@@ -64,12 +64,7 @@ public class CoordinatorRunStats
 
   public long getSegmentStat(CoordinatorStat stat, String tier, String 
datasource)
   {
-    return get(stat, RowKey.builder().add(Dimension.DATASOURCE, 
datasource).add(Dimension.TIER, tier).build());
-  }
-
-  public long getDataSourceStat(CoordinatorStat stat, String dataSource)
-  {
-    return get(stat, RowKey.forDatasource(dataSource));
+    return get(stat, RowKey.with(Dimension.DATASOURCE, 
datasource).and(Dimension.TIER, tier));
   }
 
   public long get(CoordinatorStat stat)
@@ -87,7 +82,7 @@ public class CoordinatorRunStats
   {
     allStats.forEach(
         (rowKey, stats) -> stats.object2LongEntrySet().fastForEach(
-            stat -> handler.handle(rowKey.getValues(), stat.getKey(), 
stat.getLongValue())
+            stat -> handler.handle(stat.getKey(), rowKey, stat.getLongValue())
         )
     );
   }
@@ -199,16 +194,10 @@ public class CoordinatorRunStats
             .addTo(stat, value);
   }
 
-  public void addToDatasourceStat(CoordinatorStat stat, String dataSource, 
long value)
-  {
-    add(stat, RowKey.forDatasource(dataSource), value);
-  }
-
   public void addToSegmentStat(CoordinatorStat stat, String tier, String 
datasource, long value)
   {
-    RowKey rowKey = RowKey.builder()
-                          .add(Dimension.TIER, tier)
-                          .add(Dimension.DATASOURCE, datasource).build();
+    RowKey rowKey = RowKey.with(Dimension.TIER, tier)
+                          .and(Dimension.DATASOURCE, datasource);
     add(stat, rowKey, value);
   }
 
@@ -267,7 +256,7 @@ public class CoordinatorRunStats
 
   public interface StatHandler
   {
-    void handle(Map<Dimension, String> dimensionValues, CoordinatorStat stat, 
long statValue);
+    void handle(CoordinatorStat stat, RowKey rowKey, long statValue);
   }
 
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/RowKey.java 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/RowKey.java
index 1893f86a9c..b0ee0a2d1f 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/stats/RowKey.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/RowKey.java
@@ -40,19 +40,16 @@ public class RowKey
     this.hashCode = Objects.hash(values);
   }
 
-  public static Builder builder()
+  public static Builder with(Dimension dimension, String value)
   {
-    return new RowKey.Builder();
+    Builder builder = new Builder();
+    builder.with(dimension, value);
+    return builder;
   }
 
-  public static RowKey forTier(String tier)
+  public static RowKey of(Dimension dimension, String value)
   {
-    return RowKey.builder().add(Dimension.TIER, tier).build();
-  }
-
-  public static RowKey forDatasource(String datasource)
-  {
-    return RowKey.builder().add(Dimension.DATASOURCE, datasource).build();
+    return with(dimension, value).build();
   }
 
   public Map<Dimension, String> getValues()
@@ -83,12 +80,18 @@ public class RowKey
   {
     private final Map<Dimension, String> values = new 
EnumMap<>(Dimension.class);
 
-    public Builder add(Dimension dimension, String value)
+    public Builder with(Dimension dimension, String value)
     {
       values.put(dimension, value);
       return this;
     }
 
+    public RowKey and(Dimension dimension, String value)
+    {
+      values.put(dimension, value);
+      return new RowKey(values);
+    }
+
     public RowKey build()
     {
       return new RowKey(values);
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
index 791d3963dd..9864aa6b3a 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
@@ -131,11 +131,11 @@ public class Stats
 
   public static class Balancer
   {
-    public static final CoordinatorStat RAW_COST
-        = new CoordinatorStat("initialCost", "segment/cost/raw");
-    public static final CoordinatorStat NORMALIZATION_COST
-        = new CoordinatorStat("normaliznCost", "segment/cost/normalization");
-    public static final CoordinatorStat NORMALIZED_COST_X_1000
-        = new CoordinatorStat("normalizedCost", "segment/cost/normalized");
+    public static final CoordinatorStat COMPUTATION_ERRORS
+        = new CoordinatorStat("costComputeError", 
"segment/balancer/compute/error");
+    public static final CoordinatorStat COMPUTATION_TIME
+        = new CoordinatorStat("costComputeTime", 
"segment/balancer/compute/time");
+    public static final CoordinatorStat COMPUTATION_COUNT
+        = new CoordinatorStat("costComputeCount", 
"segment/balancer/compute/count");
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
index 3354080069..c2c77f02cb 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
@@ -161,14 +161,14 @@ public class CoordinatorRunStatsTest
     );
     expected.forEach(
         (duty, count) ->
-            stats.add(STAT_1, RowKey.builder().add(Dimension.DUTY, 
duty).build(), count)
+            stats.add(STAT_1, RowKey.of(Dimension.DUTY, duty), count)
     );
 
     final Map<String, Long> actual = new HashMap<>();
     stats.forEachStat(
-        (dimensionValues, stat, value) -> {
+        (stat, rowKey, value) -> {
           if (stat.equals(STAT_1)) {
-            actual.put(dimensionValues.get(Dimension.DUTY), value);
+            actual.put(rowKey.getValues().get(Dimension.DUTY), value);
           }
         }
     );
@@ -192,7 +192,7 @@ public class CoordinatorRunStatsTest
     debugStats.add(DEBUG_STAT_1, Key.TIER_1, 1);
     Assert.assertTrue(debugStats.hasStat(DEBUG_STAT_1));
 
-    debugStats.addToDatasourceStat(DEBUG_STAT_2, "wiki", 1);
+    debugStats.add(DEBUG_STAT_2, RowKey.of(Dimension.DATASOURCE, "wiki"), 1);
     Assert.assertFalse(debugStats.hasStat(DEBUG_STAT_2));
   }
 
@@ -201,11 +201,11 @@ public class CoordinatorRunStatsTest
    */
   private static class Key
   {
-    static final RowKey TIER_1 = RowKey.forTier("tier1");
-    static final RowKey TIER_2 = RowKey.forTier("tier2");
+    static final RowKey TIER_1 = RowKey.of(Dimension.TIER, "tier1");
+    static final RowKey TIER_2 = RowKey.of(Dimension.TIER, "tier2");
 
-    static final RowKey DUTY_1 = RowKey.builder().add(Dimension.DUTY, 
"duty1").build();
-    static final RowKey DUTY_2 = RowKey.builder().add(Dimension.DUTY, 
"duty2").build();
+    static final RowKey DUTY_1 = RowKey.of(Dimension.DUTY, "duty1");
+    static final RowKey DUTY_2 = RowKey.of(Dimension.DUTY, "duty2");
   }
 
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java
index f4cad1cb5c..23975352b3 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java
@@ -38,6 +38,7 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -102,7 +103,7 @@ public class CachingCostBalancerStrategyTest
                   .findDestinationServerToMoveSegment(s, firstServer, 
serverHolderList);
               ServerHolder s2 = costBalancerStrategy
                   .findDestinationServerToMoveSegment(s, firstServer, 
serverHolderList);
-              return 
(s1.getServer().getName().equals(s2.getServer().getName())) ? 0 : 1;
+              return Objects.equals(s1, s2) ? 0 : 1;
             }
         )
         .sum();
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
index 6580425764..17d0a8716a 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
@@ -21,17 +21,19 @@ package org.apache.druid.server.coordinator.balancer;
 
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.GranularityType;
 import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.core.Event;
-import org.apache.druid.java.util.emitter.service.AlertEvent;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordinator.CreateDataSegments;
 import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
-import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
 import org.junit.After;
 import org.junit.Assert;
@@ -61,7 +63,7 @@ public class CostBalancerStrategyTest
   @Before
   public void setup()
   {
-    balancerExecutor = new BlockingExecutorService("test-balance-exec-%d");
+    balancerExecutor = Execs.singleThreaded("test-balance-exec-%d");
     strategy = new 
CostBalancerStrategy(MoreExecutors.listeningDecorator(balancerExecutor));
 
     serviceEmitter = new StubServiceEmitter("test-service", "host");
@@ -241,7 +243,7 @@ public class CostBalancerStrategyTest
   }
 
   @Test
-  public void testComputeCost()
+  public void testComputePlacementCost()
   {
     // Create segments for different granularities
     final List<DataSegment> daySegments =
@@ -265,7 +267,7 @@ public class CostBalancerStrategyTest
                           .withNumPartitions(30)
                           .eachOfSizeInMb(100);
 
-    // Distribute the segments randomly amongst 2 servers
+    // Distribute the segments randomly amongst 3 servers
     final List<DataSegment> segments = new ArrayList<>(daySegments);
     segments.addAll(monthSegments);
     segments.addAll(yearSegments);
@@ -284,35 +286,66 @@ public class CostBalancerStrategyTest
         server -> new ServerHolder(server.toImmutableDruidServer(), new 
LoadQueuePeonTester())
     ).collect(Collectors.toList());
 
+    final ServerHolder serverA = serverHolders.get(0);
+    final ServerHolder serverB = serverHolders.get(1);
+    final ServerHolder serverC = serverHolders.get(2);
+
     // Verify costs for DAY, MONTH and YEAR segments
-    verifyServerCosts(
-        daySegments.get(0),
-        serverHolders,
-        5191.500804, 8691.392080, 6418.467818
-    );
-    verifyServerCosts(
-        monthSegments.get(0),
-        serverHolders,
-        301935.940609, 301935.940606, 304333.677669
-    );
-    verifyServerCosts(
-        yearSegments.get(0),
-        serverHolders,
-        8468764.380437, 12098919.896931, 14501440.169452
-    );
+    final DataSegment daySegment = daySegments.get(0);
+    verifyPlacementCost(daySegment, serverA, 5191.500804);
+    verifyPlacementCost(daySegment, serverB, 8691.392080);
+    verifyPlacementCost(daySegment, serverC, 6418.467818);
+
+    final DataSegment monthSegment = monthSegments.get(0);
+    verifyPlacementCost(monthSegment, serverA, 301935.940609);
+    verifyPlacementCost(monthSegment, serverB, 301935.940606);
+    verifyPlacementCost(monthSegment, serverC, 304333.677669);
+
+    final DataSegment yearSegment = yearSegments.get(0);
+    verifyPlacementCost(yearSegment, serverA, 8468764.380437);
+    verifyPlacementCost(yearSegment, serverB, 12098919.896931);
+    verifyPlacementCost(yearSegment, serverC, 14501440.169452);
 
     // Verify costs for an ALL granularity segment
-    DataSegment allGranularitySegment =
+    final DataSegment allGranularitySegment =
         CreateDataSegments.ofDatasource(DS_WIKI)
                           .forIntervals(1, Granularities.ALL)
                           .eachOfSizeInMb(100).get(0);
-    verifyServerCosts(
-        allGranularitySegment,
-        serverHolders,
-        1.1534173737329768e7,
-        1.6340633534241956e7,
-        1.9026400521582970e7
+    verifyPlacementCost(allGranularitySegment, serverA, 1.1534173737329768e7);
+    verifyPlacementCost(allGranularitySegment, serverB, 1.6340633534241956e7);
+    verifyPlacementCost(allGranularitySegment, serverC, 1.9026400521582970e7);
+  }
+
+  @Test
+  public void testGetAndResetStats()
+  {
+    final ServerHolder serverA = new ServerHolder(
+        createHistorical().toImmutableDruidServer(),
+        new LoadQueuePeonTester()
+    );
+    final ServerHolder serverB = new ServerHolder(
+        createHistorical().toImmutableDruidServer(),
+        new LoadQueuePeonTester()
     );
+
+    final DataSegment segment = 
CreateDataSegments.ofDatasource(DS_WIKI).eachOfSizeInMb(100).get(0);
+
+    // Verify that computation stats have been tracked
+    strategy.findServersToLoadSegment(segment, Arrays.asList(serverA, 
serverB));
+    CoordinatorRunStats computeStats = strategy.getAndResetStats();
+
+    final RowKey rowKey = RowKey.with(Dimension.DATASOURCE, DS_WIKI)
+                                .with(Dimension.DESCRIPTION, "LOAD")
+                                .and(Dimension.TIER, "hot");
+    Assert.assertEquals(1L, computeStats.get(Stats.Balancer.COMPUTATION_COUNT, 
rowKey));
+
+    long computeTime = computeStats.get(Stats.Balancer.COMPUTATION_TIME, 
rowKey);
+    Assert.assertTrue(computeTime >= 0 && computeTime <= 100);
+    
Assert.assertFalse(computeStats.hasStat(Stats.Balancer.COMPUTATION_ERRORS));
+
+    // Verify that stats have been reset
+    computeStats = strategy.getAndResetStats();
+    Assert.assertEquals(0, computeStats.rowCount());
   }
 
   @Test
@@ -334,42 +367,24 @@ public class CostBalancerStrategyTest
     );
   }
 
-  @Test(timeout = 90_000L)
-  public void testFindServerRaisesAlertOnTimeout()
+  /**
+   * Verifies that the cost of placing the segment on the server is as 
expected.
+   * Also verifies that this cost is equal to the total joint cost of this 
segment
+   * with each segment currently on the server.
+   */
+  private void verifyPlacementCost(DataSegment segment, ServerHolder server, 
double expectedCost)
   {
-    DataSegment segment = CreateDataSegments.ofDatasource(DS_WIKI)
-                                            .forIntervals(1, Granularities.DAY)
-                                            .startingAt("2012-10-24")
-                                            .eachOfSizeInMb(100).get(0);
-
-    final LoadQueuePeonTester peon = new LoadQueuePeonTester();
-    ServerHolder serverA = new 
ServerHolder(createHistorical().toImmutableDruidServer(), peon);
-    ServerHolder serverB = new 
ServerHolder(createHistorical().toImmutableDruidServer(), peon);
-
-    strategy.findServersToLoadSegment(segment, Arrays.asList(serverA, 
serverB));
-
-    List<Event> events = serviceEmitter.getEvents();
-    Assert.assertEquals(1, events.size());
-    Assert.assertTrue(events.get(0) instanceof AlertEvent);
-
-    AlertEvent alertEvent = (AlertEvent) events.get(0);
-    Assert.assertEquals(
-        "Cost balancer strategy timed out in action 
[findServersToLoadSegment]."
-        + " Try setting a higher value of 'balancerComputeThreads'.",
-        alertEvent.getDescription()
-    );
-  }
+    double observedCost = strategy.computePlacementCost(segment, server);
+    Assert.assertEquals(expectedCost, observedCost, DELTA);
 
-  private void verifyServerCosts(
-      DataSegment segment,
-      List<ServerHolder> serverHolders,
-      double... expectedCosts
-  )
-  {
-    for (int i = 0; i < serverHolders.size(); ++i) {
-      double observedCost = strategy.computeCost(segment, 
serverHolders.get(i), true);
-      Assert.assertEquals(expectedCosts[i], observedCost, DELTA);
+    double totalJointSegmentCost = 0;
+    for (DataSegment segmentOnServer : 
server.getServer().iterateAllSegments()) {
+      totalJointSegmentCost += 
CostBalancerStrategy.computeJointSegmentsCost(segment, segmentOnServer);
+    }
+    if (server.isServingSegment(segment)) {
+      totalJointSegmentCost -= 
CostBalancerStrategy.computeJointSegmentsCost(segment, segment);
     }
+    Assert.assertEquals(totalJointSegmentCost, observedCost, DELTA);
   }
 
   private void verifyJointSegmentsCost(
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
index a506443e8e..39dcc9ce50 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
@@ -37,7 +37,6 @@ import 
org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.easymock.EasyMock;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.junit.After;
@@ -132,13 +131,8 @@ public class BalanceSegmentsTest
   @Test
   public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove()
   {
-    final ServerHolder serverHolder1 = createHolder(server1, false, segment1, 
segment2);
-    final ServerHolder serverHolder2 = createHolder(server2, true, segment3, 
segment4);
-    final ServerHolder serverHolder3 = createHolder(server3, false);
-
-    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-    expectFindDestinationAndReturn(strategy, serverHolder3);
-    EasyMock.replay(strategy);
+    final ServerHolder serverHolder1 = createHolder(server1, true, segment1, 
segment2, segment3, segment4);
+    final ServerHolder serverHolder2 = createHolder(server2, false);
 
     // ceil(3 * 0.6) = 2 segments from decommissioning servers
     CoordinatorDynamicConfig dynamicConfig =
@@ -148,24 +142,21 @@ public class BalanceSegmentsTest
                                 
.withDecommissioningMaxPercentOfMaxSegmentsToMove(60)
                                 .build();
     DruidCoordinatorRuntimeParams params =
-        defaultRuntimeParamsBuilder(serverHolder1, serverHolder2, 
serverHolder3)
+        defaultRuntimeParamsBuilder(serverHolder1, serverHolder2)
             .withDynamicConfigs(dynamicConfig)
-            .withBalancerStrategy(strategy)
+            .withBalancerStrategy(balancerStrategy)
             .withBroadcastDatasources(broadcastDatasources)
             .withSegmentAssignerUsing(loadQueueManager)
             .build();
 
     CoordinatorRunStats stats = runBalancer(params);
 
-    EasyMock.verify(strategy);
-
-    // 2 segments are moved from the decommissioning server and 1 from the 
active server
-    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.MOVED, 
"normal", "datasource1"));
-    Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.MOVED, 
"normal", "datasource2"));
-    Set<DataSegment> segmentsMoved = 
serverHolder3.getPeon().getSegmentsToLoad();
-    Assert.assertTrue(segmentsMoved.contains(segment3));
-    Assert.assertTrue(segmentsMoved.contains(segment4));
-    Assert.assertTrue(segmentsMoved.contains(segment1) || 
segmentsMoved.contains(segment2));
+    // 2 segments are moved from the decommissioning server
+    long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource1")
+                      + stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource2");
+    Assert.assertEquals(2L, totalMoved);
+    Set<DataSegment> segmentsMoved = 
serverHolder2.getPeon().getSegmentsToLoad();
+    Assert.assertEquals(2, segmentsMoved.size());
   }
 
   @Test
@@ -220,33 +211,27 @@ public class BalanceSegmentsTest
   @Test
   public void 
testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissioning()
   {
-    final ServerHolder serverHolder1 = createHolder(server1, segment1, 
segment2);
-    final ServerHolder serverHolder2 = createHolder(server2, segment3, 
segment4);
-    final ServerHolder serverHolder3 = createHolder(server3);
-
-    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-    expectFindDestinationAndReturn(strategy, serverHolder3);
-    EasyMock.replay(strategy);
+    final ServerHolder serverHolder1 = createHolder(server1, segment1, 
segment2, segment3, segment4);
+    final ServerHolder serverHolder2 = createHolder(server2);
 
     CoordinatorDynamicConfig dynamicConfig =
         CoordinatorDynamicConfig.builder()
                                 .withSmartSegmentLoading(false)
-                                .withMaxSegmentsToMove(3)
+                                .withMaxSegmentsToMove(4)
                                 
.withDecommissioningMaxPercentOfMaxSegmentsToMove(9)
                                 .build();
     DruidCoordinatorRuntimeParams params =
-        defaultRuntimeParamsBuilder(serverHolder1, serverHolder2, 
serverHolder3)
+        defaultRuntimeParamsBuilder(serverHolder1, serverHolder2)
             .withDynamicConfigs(dynamicConfig)
-            .withBalancerStrategy(strategy)
+            .withBalancerStrategy(balancerStrategy)
             .withSegmentAssignerUsing(loadQueueManager)
             .build();
 
     CoordinatorRunStats stats = runBalancer(params);
-    EasyMock.verify(strategy);
     long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource1")
                       + stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource2");
-    Assert.assertEquals(3L, totalMoved);
-    Assert.assertEquals(3, serverHolder3.getPeon().getSegmentsToLoad().size());
+    Assert.assertEquals(2L, totalMoved);
+    Assert.assertEquals(2, serverHolder2.getPeon().getSegmentsToLoad().size());
   }
 
   /**
@@ -258,18 +243,13 @@ public class BalanceSegmentsTest
     final ServerHolder activeServer = createHolder(server1, false, 
allSegments);
     final ServerHolder decommissioningServer = createHolder(server2, true);
 
-    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-    expectFindDestinationAndReturn(strategy, decommissioningServer);
-    EasyMock.replay(strategy);
-
     DruidCoordinatorRuntimeParams params =
         defaultRuntimeParamsBuilder(activeServer, decommissioningServer)
-            .withBalancerStrategy(strategy)
+            .withBalancerStrategy(balancerStrategy)
             .withBroadcastDatasources(broadcastDatasources)
             .build();
 
     CoordinatorRunStats stats = runBalancer(params);
-    EasyMock.verify(strategy);
     Assert.assertFalse(stats.hasStat(Stats.Segments.MOVED));
   }
 
@@ -279,22 +259,17 @@ public class BalanceSegmentsTest
     final ServerHolder decommissioningServer = createHolder(server1, true, 
allSegments);
     final ServerHolder activeServer = createHolder(server2);
 
-    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-    expectFindDestinationAndReturn(strategy, activeServer);
-    EasyMock.replay(strategy);
-
     DruidCoordinatorRuntimeParams params = 
defaultRuntimeParamsBuilder(decommissioningServer, activeServer)
         .withDynamicConfigs(
             CoordinatorDynamicConfig.builder()
                                     .withSmartSegmentLoading(false)
                                     .withMaxSegmentsToMove(3).build()
         )
-        .withBalancerStrategy(strategy)
+        .withBalancerStrategy(balancerStrategy)
         .withBroadcastDatasources(broadcastDatasources)
         .build();
 
     runBalancer(params);
-    EasyMock.verify(strategy);
     Assert.assertEquals(0, 
decommissioningServer.getPeon().getSegmentsToLoad().size());
     Assert.assertEquals(3, activeServer.getPeon().getSegmentsToLoad().size());
   }
@@ -358,10 +333,6 @@ public class BalanceSegmentsTest
     final ServerHolder holder2 = createHolder(server2, segment3, segment4);
     final ServerHolder holder3 = createHolder(server3);
 
-    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-    expectFindDestinationAndReturn(strategy, holder3);
-    EasyMock.replay(strategy);
-
     DruidCoordinatorRuntimeParams params =
         defaultRuntimeParamsBuilder(holder1, holder2, holder3)
             .withDynamicConfigs(
@@ -372,12 +343,11 @@ public class BalanceSegmentsTest
                                         
.withPercentOfSegmentsToConsiderPerMove(40)
                                         .build()
             )
-            .withBalancerStrategy(strategy)
+            .withBalancerStrategy(balancerStrategy)
             .withBroadcastDatasources(broadcastDatasources)
             .build();
 
     CoordinatorRunStats stats = runBalancer(params);
-    EasyMock.verify(strategy);
     long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource1")
                       + stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource2");
     Assert.assertEquals(1L, totalMoved);
@@ -463,17 +433,6 @@ public class BalanceSegmentsTest
     );
   }
 
-  private void expectFindDestinationAndReturn(BalancerStrategy strategy, 
ServerHolder chosenServer)
-  {
-    EasyMock.expect(
-        strategy.findDestinationServerToMoveSegment(
-            EasyMock.anyObject(),
-            EasyMock.anyObject(),
-            EasyMock.anyObject()
-        )
-    ).andReturn(chosenServer).anyTimes();
-  }
-
   private DataSegment createHourlySegment(String datasource, DateTime start, 
String version)
   {
     return new DataSegment(
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 14fcd05d00..7487d332d1 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -2037,7 +2037,7 @@ public class CompactSegmentsTest
       // all the datasources in the coordinator stats
       final AtomicInteger numDatasources = new AtomicInteger();
       stats.forEachStat(
-          (dimensionValues, stat, value) -> {
+          (stat, rowKey, value) -> {
             if (stat.equals(Stats.Compaction.PENDING_BYTES)
                 && (expectedRemainingSegments <= 0 || value == 
expectedRemainingSegments)) {
               numDatasources.incrementAndGet();
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java
index cd9c1e228d..46240a340d 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java
@@ -76,6 +76,7 @@ public class RunRulesTest
 {
   private static final long SERVER_SIZE_10GB = 10L << 30;
   private static final String DATASOURCE = "test";
+  private static final RowKey DATASOURCE_STAT_KEY = 
RowKey.of(Dimension.DATASOURCE, DATASOURCE);
 
   private LoadQueuePeon mockPeon;
   private RunRules ruleRunner;
@@ -563,7 +564,7 @@ public class RunRulesTest
         .build();
 
     CoordinatorRunStats stats = runDutyAndGetStats(params);
-    Assert.assertEquals(12L, stats.getDataSourceStat(Stats.Segments.DELETED, 
DATASOURCE));
+    Assert.assertEquals(12L, stats.get(Stats.Segments.DELETED, 
DATASOURCE_STAT_KEY));
   }
 
   @Test
@@ -616,7 +617,7 @@ public class RunRulesTest
     CoordinatorRunStats stats = runDutyAndGetStats(params);
 
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, 
"normal", DATASOURCE));
-    Assert.assertEquals(12L, stats.getDataSourceStat(Stats.Segments.DELETED, 
DATASOURCE));
+    Assert.assertEquals(12L, stats.get(Stats.Segments.DELETED, 
DATASOURCE_STAT_KEY));
 
     EasyMock.verify(mockPeon);
   }
@@ -662,7 +663,7 @@ public class RunRulesTest
 
     CoordinatorRunStats stats = runDutyAndGetStats(params);
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, 
"normal", DATASOURCE));
-    Assert.assertEquals(12L, stats.getDataSourceStat(Stats.Segments.DELETED, 
DATASOURCE));
+    Assert.assertEquals(12L, stats.get(Stats.Segments.DELETED, 
DATASOURCE_STAT_KEY));
 
     EasyMock.verify(mockPeon);
   }
@@ -705,7 +706,7 @@ public class RunRulesTest
 
     CoordinatorRunStats stats = runDutyAndGetStats(params);
     Assert.assertFalse(stats.hasStat(Stats.Segments.DROPPED));
-    Assert.assertEquals(12L, stats.getDataSourceStat(Stats.Segments.DELETED, 
DATASOURCE));
+    Assert.assertEquals(12L, stats.get(Stats.Segments.DELETED, 
DATASOURCE_STAT_KEY));
 
     EasyMock.verify(mockPeon);
   }
@@ -1188,7 +1189,7 @@ public class RunRulesTest
             .build();
 
     CoordinatorRunStats stats = runDutyAndGetStats(params);
-    final RowKey tierRowKey = RowKey.builder().add(Dimension.TIER, 
DruidServer.DEFAULT_TIER).build();
+    final RowKey tierRowKey = RowKey.of(Dimension.TIER, 
DruidServer.DEFAULT_TIER);
     Assert.assertEquals(
         dataSegment.getSize() * numReplicants,
         stats.get(Stats.Tier.REQUIRED_CAPACITY, tierRowKey)
@@ -1250,7 +1251,7 @@ public class RunRulesTest
             .build();
 
     CoordinatorRunStats stats = runDutyAndGetStats(params);
-    final RowKey tierRowKey = RowKey.builder().add(Dimension.TIER, 
DruidServer.DEFAULT_TIER).build();
+    final RowKey tierRowKey = RowKey.of(Dimension.TIER, 
DruidServer.DEFAULT_TIER);
     Assert.assertEquals(
         dataSegment.getSize() * numReplicants,
         stats.get(Stats.Tier.REQUIRED_CAPACITY, tierRowKey)
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
index 668013f7ff..c7e3b5b239 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.server.coordinator.rules;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -77,11 +76,8 @@ public class LoadRuleTest
   private ListeningExecutorService exec;
   private BalancerStrategy balancerStrategy;
 
-  private CachingCostBalancerStrategy cachingCostBalancerStrategy;
-
   private SegmentLoadQueueManager loadQueueManager;
   private final boolean useRoundRobinAssignment;
-  private BalancerStrategy mockBalancerStrategy;
 
   private final AtomicInteger serverId = new AtomicInteger();
 
@@ -101,9 +97,6 @@ public class LoadRuleTest
   {
     exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, 
"LoadRuleTest-%d"));
     balancerStrategy = new 
CostBalancerStrategyFactory().createBalancerStrategy(exec);
-    cachingCostBalancerStrategy = new 
CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec);
-
-    mockBalancerStrategy = EasyMock.createMock(BalancerStrategy.class);
     loadQueueManager = new SegmentLoadQueueManager(null, null, null);
   }
 
@@ -119,13 +112,7 @@ public class LoadRuleTest
     final LoadQueuePeon mockPeon = createEmptyPeon();
     mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject());
     EasyMock.expectLastCall().atLeastOnce();
-
-    if (!useRoundRobinAssignment) {
-      
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(),
 EasyMock.anyObject()))
-              .andDelegateTo(balancerStrategy)
-              .times(2);
-    }
-    EasyMock.replay(mockPeon, mockBalancerStrategy);
+    EasyMock.replay(mockPeon);
 
     DruidCluster druidCluster = DruidCluster
         .builder()
@@ -140,14 +127,10 @@ public class LoadRuleTest
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T1, DS_WIKI));
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T2, DS_WIKI));
 
-    EasyMock.verify(mockPeon, mockBalancerStrategy);
+    EasyMock.verify(mockPeon);
   }
 
-  private CoordinatorRunStats runRuleAndGetStats(
-      LoadRule rule,
-      DataSegment segment,
-      DruidCluster cluster
-  )
+  private CoordinatorRunStats runRuleAndGetStats(LoadRule rule, DataSegment 
segment, DruidCluster cluster)
   {
     return runRuleAndGetStats(rule, segment, 
makeCoordinatorRuntimeParams(cluster, segment));
   }
@@ -160,7 +143,7 @@ public class LoadRuleTest
   {
     final StrategicSegmentAssigner segmentAssigner = 
params.getSegmentAssigner();
     rule.run(segment, segmentAssigner);
-    return segmentAssigner.getStats();
+    return params.getCoordinatorStats();
   }
 
   private DruidCoordinatorRuntimeParams makeCoordinatorRuntimeParams(
@@ -171,7 +154,7 @@ public class LoadRuleTest
     return DruidCoordinatorRuntimeParams
         .newBuilder(DateTimes.nowUtc())
         .withDruidCluster(druidCluster)
-        .withBalancerStrategy(mockBalancerStrategy)
+        .withBalancerStrategy(balancerStrategy)
         .withUsedSegmentsInTest(usedSegments)
         .withDynamicConfigs(
             CoordinatorDynamicConfig.builder()
@@ -189,16 +172,7 @@ public class LoadRuleTest
     final LoadQueuePeon mockPeon = createEmptyPeon();
     mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject());
     EasyMock.expectLastCall().atLeastOnce();
-
-    LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
-
-    final DataSegment segment = createDataSegment(DS_WIKI);
-
-    
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(),
 EasyMock.anyObject()))
-            .andDelegateTo(balancerStrategy)
-            .anyTimes();
-
-    EasyMock.replay(mockPeon, mockBalancerStrategy);
+    EasyMock.replay(mockPeon);
 
     ImmutableDruidServer server1 = 
createServer(Tier.T1).toImmutableDruidServer();
     ImmutableDruidServer server2 = 
createServer(Tier.T1).toImmutableDruidServer();
@@ -207,6 +181,8 @@ public class LoadRuleTest
         .addTier(Tier.T1, new ServerHolder(server1, mockPeon), new 
ServerHolder(server2, mockPeon))
         .build();
 
+    final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
+    final DataSegment segment = createDataSegment(DS_WIKI);
     CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, 
druidCluster);
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T1, segment.getDataSource()));
 
@@ -223,7 +199,7 @@ public class LoadRuleTest
 
     Assert.assertFalse(statsAfterLoadPrimary.hasStat(Stats.Segments.ASSIGNED));
 
-    EasyMock.verify(mockPeon, mockBalancerStrategy);
+    EasyMock.verify(mockPeon);
   }
 
   @Test
@@ -233,16 +209,7 @@ public class LoadRuleTest
     final LoadQueuePeon emptyPeon = createEmptyPeon();
     emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject());
     EasyMock.expectLastCall().atLeastOnce();
-
-    LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
-
-    final DataSegment segment = createDataSegment(DS_WIKI);
-
-    
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(),
 EasyMock.anyObject()))
-            .andDelegateTo(balancerStrategy)
-            .anyTimes();
-
-    EasyMock.replay(emptyPeon, mockBalancerStrategy);
+    EasyMock.replay(emptyPeon);
 
     ImmutableDruidServer server1 = 
createServer(Tier.T1).toImmutableDruidServer();
     ImmutableDruidServer server2 = 
createServer(Tier.T1).toImmutableDruidServer();
@@ -251,6 +218,8 @@ public class LoadRuleTest
         .addTier(Tier.T1, new ServerHolder(server1, emptyPeon), new 
ServerHolder(server2, emptyPeon))
         .build();
 
+    final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
+    final DataSegment segment = createDataSegment(DS_WIKI);
     CoordinatorRunStats stats = runRuleAndGetStats(
         rule,
         segment,
@@ -277,7 +246,7 @@ public class LoadRuleTest
 
     Assert.assertEquals(1L, 
statsAfterLoadPrimary.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, 
DS_WIKI));
 
-    EasyMock.verify(emptyPeon, mockBalancerStrategy);
+    EasyMock.verify(emptyPeon);
   }
 
   @Test
@@ -286,16 +255,7 @@ public class LoadRuleTest
     final LoadQueuePeon emptyPeon = createEmptyPeon();
     emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject());
     EasyMock.expectLastCall().atLeastOnce();
-
-    LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
-
-    final DataSegment segment = createDataSegment(DS_WIKI);
-
-    
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(),
 EasyMock.anyObject()))
-            .andDelegateTo(balancerStrategy)
-            .anyTimes();
-
-    EasyMock.replay(emptyPeon, mockBalancerStrategy);
+    EasyMock.replay(emptyPeon);
 
     ImmutableDruidServer server1 = 
createServer(Tier.T1).toImmutableDruidServer();
     ImmutableDruidServer server2 = 
createServer(Tier.T1).toImmutableDruidServer();
@@ -304,6 +264,8 @@ public class LoadRuleTest
         .addTier(Tier.T1, new ServerHolder(server1, emptyPeon), new 
ServerHolder(server2, emptyPeon))
         .build();
 
+    final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
+    final DataSegment segment = createDataSegment(DS_WIKI);
     CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, 
druidCluster);
 
     // Ensure that the segment is assigned to one of the historicals
@@ -323,14 +285,12 @@ public class LoadRuleTest
 
     Assert.assertFalse(statsAfterLoadPrimary.hasStat(Stats.Segments.ASSIGNED));
 
-    EasyMock.verify(emptyPeon, mockBalancerStrategy);
+    EasyMock.verify(emptyPeon);
   }
 
   @Test
   public void 
testLoadUsedSegmentsForAllSegmentGranularityAndCachingCostBalancerStrategy()
   {
-    LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
-
     final List<DataSegment> segments =
         CreateDataSegments.ofDatasource(DS_WIKI)
                           .forIntervals(1, Granularities.ALL)
@@ -338,21 +298,18 @@ public class LoadRuleTest
                           .eachOfSizeInMb(100);
 
     final LoadQueuePeon loadingPeon = createLoadingPeon(segments.get(0), true);
-
     loadingPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject());
     EasyMock.expectLastCall().once();
-
-    
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(),
 EasyMock.anyObject()))
-            .andDelegateTo(cachingCostBalancerStrategy)
-            .anyTimes();
-
-    EasyMock.replay(loadingPeon, mockBalancerStrategy);
+    EasyMock.replay(loadingPeon);
 
     DruidCluster druidCluster = DruidCluster
         .builder()
         .addTier(Tier.T1, createServerHolder(Tier.T1, loadingPeon, false))
         .build();
 
+    balancerStrategy = new 
CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec);
+
+    LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
     CoordinatorRunStats stats = runRuleAndGetStats(
         rule,
         segments.get(1),
@@ -360,7 +317,7 @@ public class LoadRuleTest
     );
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T1, DS_WIKI));
 
-    EasyMock.verify(loadingPeon, mockBalancerStrategy);
+    EasyMock.verify(loadingPeon);
   }
 
   @Test
@@ -369,10 +326,7 @@ public class LoadRuleTest
     final LoadQueuePeon mockPeon = createEmptyPeon();
     mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
     EasyMock.expectLastCall().atLeastOnce();
-    
EasyMock.expect(mockBalancerStrategy.pickServersToDropSegment(EasyMock.anyObject(),
 EasyMock.anyObject()))
-            .andDelegateTo(balancerStrategy)
-            .times(4);
-    EasyMock.replay(mockPeon, mockBalancerStrategy);
+    EasyMock.replay(mockPeon);
 
     final DataSegment segment = createDataSegment(DS_WIKI);
 
@@ -407,14 +361,7 @@ public class LoadRuleTest
     final LoadQueuePeon mockPeon = createEmptyPeon();
     mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject());
     EasyMock.expectLastCall().atLeastOnce();
-
-    if (!useRoundRobinAssignment) {
-      
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(),
 EasyMock.anyObject()))
-              .andDelegateTo(balancerStrategy)
-              .times(1);
-    }
-
-    EasyMock.replay(mockPeon, mockBalancerStrategy);
+    EasyMock.replay(mockPeon);
 
     DruidCluster druidCluster = DruidCluster
         .builder()
@@ -428,7 +375,7 @@ public class LoadRuleTest
 
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T1, DS_WIKI));
 
-    EasyMock.verify(mockPeon, mockBalancerStrategy);
+    EasyMock.verify(mockPeon);
   }
 
   @Test
@@ -437,7 +384,7 @@ public class LoadRuleTest
     final LoadQueuePeon mockPeon = createEmptyPeon();
     mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
     EasyMock.expectLastCall().atLeastOnce();
-    EasyMock.replay(mockPeon, mockBalancerStrategy);
+    EasyMock.replay(mockPeon);
 
     final DataSegment segment = createDataSegment(DS_WIKI);
 
@@ -460,19 +407,12 @@ public class LoadRuleTest
 
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, 
Tier.T1, DS_WIKI));
 
-    EasyMock.verify(mockPeon, mockBalancerStrategy);
+    EasyMock.verify(mockPeon);
   }
 
   @Test
   public void testMaxLoadingQueueSize()
   {
-    if (!useRoundRobinAssignment) {
-      
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(),
 EasyMock.anyObject()))
-              .andDelegateTo(balancerStrategy)
-              .times(2);
-    }
-    EasyMock.replay(mockBalancerStrategy);
-
     final LoadQueuePeonTester peon = new LoadQueuePeonTester();
 
     final int maxSegmentsInQueue = 2;
@@ -482,10 +422,7 @@ public class LoadRuleTest
             Tier.T1,
             new ServerHolder(
                 createServer(Tier.T1).toImmutableDruidServer(),
-                peon,
-                false,
-                maxSegmentsInQueue,
-                10
+                peon, false, maxSegmentsInQueue, 10
             )
         )
         .build();
@@ -497,7 +434,7 @@ public class LoadRuleTest
     DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
         .newBuilder(DateTimes.nowUtc())
         .withDruidCluster(druidCluster)
-        .withBalancerStrategy(mockBalancerStrategy)
+        .withBalancerStrategy(balancerStrategy)
         .withUsedSegmentsInTest(dataSegment1, dataSegment2, dataSegment3)
         .withDynamicConfigs(
             CoordinatorDynamicConfig.builder()
@@ -517,8 +454,6 @@ public class LoadRuleTest
     Assert.assertEquals(1L, stats1.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T1, dataSegment1.getDataSource()));
     Assert.assertEquals(1L, stats2.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T1, dataSegment2.getDataSource()));
     Assert.assertEquals(0L, stats3.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T1, dataSegment3.getDataSource()));
-
-    EasyMock.verify(mockBalancerStrategy);
   }
 
   /**
@@ -530,14 +465,7 @@ public class LoadRuleTest
   {
     final LoadQueuePeon mockPeon1 = createEmptyPeon();
     final LoadQueuePeon mockPeon2 = createOneCallPeonMock();
-
-    if (!useRoundRobinAssignment) {
-      
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(),
 EasyMock.anyObject()))
-              .andDelegateTo(balancerStrategy)
-              .times(1);
-    }
-
-    EasyMock.replay(mockPeon1, mockPeon2, mockBalancerStrategy);
+    EasyMock.replay(mockPeon1, mockPeon2);
 
     DruidCluster druidCluster = DruidCluster
         .builder()
@@ -550,7 +478,7 @@ public class LoadRuleTest
     CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, 
druidCluster);
 
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T2, DS_WIKI));
-    EasyMock.verify(mockPeon1, mockPeon2, mockBalancerStrategy);
+    EasyMock.verify(mockPeon1, mockPeon2);
   }
 
   /**
@@ -572,15 +500,6 @@ public class LoadRuleTest
     ServerHolder holder4 = createServerHolder(Tier.T2, mockPeon4, false);
 
     final DataSegment segment = createDataSegment(DS_WIKI);
-    if (!useRoundRobinAssignment) {
-      EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(segment, 
ImmutableList.of(holder2)))
-              .andReturn(Collections.singletonList(holder2).iterator());
-      EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(segment, 
ImmutableList.of(holder4, holder3)))
-              .andReturn(Arrays.asList(holder3, holder4).iterator());
-    }
-
-    EasyMock.replay(mockBalancerStrategy);
-
     DruidCluster druidCluster = DruidCluster
         .builder()
         .addTier(Tier.T1, holder1, holder2)
@@ -593,7 +512,7 @@ public class LoadRuleTest
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T1, DS_WIKI));
     Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T2, DS_WIKI));
 
-    EasyMock.verify(mockPeon1, mockPeon2, mockPeon3, mockPeon4, 
mockBalancerStrategy);
+    EasyMock.verify(mockPeon1, mockPeon2, mockPeon3, mockPeon4);
   }
 
   /**
@@ -606,10 +525,7 @@ public class LoadRuleTest
     final LoadQueuePeon mockPeon = createEmptyPeon();
     mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
     EasyMock.expectLastCall().times(2);
-    
EasyMock.expect(mockBalancerStrategy.pickServersToDropSegment(EasyMock.anyObject(),
 EasyMock.anyObject()))
-            .andDelegateTo(balancerStrategy)
-            .times(4);
-    EasyMock.replay(mockPeon, mockBalancerStrategy);
+    EasyMock.replay(mockPeon);
 
     final DataSegment segment1 = createDataSegment("foo1");
     final DataSegment segment2 = createDataSegment("foo2");
@@ -652,10 +568,6 @@ public class LoadRuleTest
     final LoadQueuePeon mockPeon1 = new LoadQueuePeonTester();
     final LoadQueuePeon mockPeon2 = new LoadQueuePeonTester();
     final LoadQueuePeon mockPeon3 = new LoadQueuePeonTester();
-    
EasyMock.expect(mockBalancerStrategy.pickServersToDropSegment(EasyMock.anyObject(),
 EasyMock.anyObject()))
-            .andDelegateTo(balancerStrategy)
-            .times(4);
-    EasyMock.replay(mockBalancerStrategy);
 
     final DataSegment segment1 = createDataSegment(DS_WIKI);
 
@@ -751,11 +663,11 @@ public class LoadRuleTest
     return mockPeon2;
   }
 
-  private ServerHolder createServerHolder(String tier, LoadQueuePeon 
mockPeon1, boolean isDecommissioning)
+  private ServerHolder createServerHolder(String tier, LoadQueuePeon 
loadQueuePeon, boolean isDecommissioning)
   {
     return new ServerHolder(
         createServer(tier).toImmutableDruidServer(),
-        mockPeon1,
+        loadQueuePeon,
         isDecommissioning
     );
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to